Database transactions are awesome! The ability to do multiple changes to data in various tables and to commit those changes so that they all appear to other sessions at once, or to roll them all back as if nothing had happened is one of the most powerful features of relational databases.
… and here’s the “but”; but sometimes you don’t want to do all processing in one transaction. For example, with the on-line systems of today we might want to do the minimum processing needed to fulfil an order in a single transaction and defer all non-immediate processing to maximise throughput. “Near real time” would be the way to describe our deferred processing. There are various ways to do this but one way is via queues with callback routines.
So, how do we go about doing this? Here’s a high level summary of the steps involved:
- Create an object type with the details that the processing will require
- Create a queue whose payload is the object type
- Create a callback routine that will be run whenever an item is queued
- Associate the callback routine with the queue so that Oracle knows to call it
Getting into some solid detail we’ll use the EMP table from the SCOTT schema. We’ll assume our business processing has two functions that need to be done asynchronously; update an employees salary and transfer an employee to a different division (okay, probably not great scenarios for asynchronous processing but I wanted something simple using the EMP table…)
Since we’ll be working within the SCOTT schema we need to ensure SCOTT can do Advance Queue operations so we grant SCOTT the ability execute on the packages we will use:
GRANT EXECUTE ON dbms_aqadm TO scott; GRANT EXECUTE ON dbms_aq TO scott;
Next we will need to create our object type that the queue will hold. For this example, the type will hold the necessary EMP attributes we need to know about and also an attribute that will store a description of the operation to perform:
CREATE OR REPLACE TYPE t_emp_event AS OBJECT (event_operation VARCHAR2 (20) ,empno NUMBER (4) ,sal NUMBER (7,2) ,mgr NUMBER (4) ,deptno NUMBER (2) ) /
Next up, we create our queue on this object type:
BEGIN dbms_aqadm.create_queue_table (queue_table => 'SCOTT.EMP_EVENT_QT' ,queue_payload_type => 'T_EMP_EVENT' ,multiple_consumers => FALSE); dbms_aqadm.create_queue (queue_name => 'SCOTT.EMP_EVENT_Q' ,queue_table => 'SCOTT.EMP_EVENT_QT); dbms_aqadm.start_queue (queue_name => 'SCOTT.EMP_EVENT_Q'); END; /
We can now enqueue messages onto our queue with a routine like the following:
PROCEDURE enqueue (p_msg IN t_emp_event) IS l_enqueue_options dbms_aq.enqueue_options_t; l_message_properties dbms_aq.message_properties_t; l_msgid RAW(16); BEGIN dbms_aq.enqueue (queue_name => 'SCOTT.EMP_EVENT_Q' ,enqueue_options => l_enqueue_options ,message_properties => l_message_properties ,payload => p_msg ,msgid => l_msgid); END enqueue;
Before we can process our messages we need to set up our callback routine, which has a specific call interface. You can read about what this interface needs to look like in the Oracle documentation:
PROCEDURE emp_event_callback (context RAW ,reginfo SYS.AQ$_REG_INFO ,descr SYS.AQ$_DESCRIPTOR ,payload RAW ,payloadl NUMBER) IS l_dequeue_options dbms_aq.dequeue_options_t; l_message_properties dbms_aq.message_properties_t; l_message_handle RAW(16); l_payload t_emp_event; BEGIN l_dequeue_options.msgid := descr.msg_id; l_dequeue_options.wait := dbms_aq.no_wait; dbms_aq.dequeue (queue_name => descr.queue_name ,dequeue_options => l_dequeue_options ,message_properties => l_message_properties ,payload => l_payload ,msgid => l_message_handle); -- here's what we'll do with the message... in this case call -- out to a separate processing routine (not shown here) process_message (p_emp_event => l_payload); END emp_event_callback;
The routine above assumes the processing of the message will be done by some other routine called “process_message”. In a full blown solution (see later on) this routine would be part of a package that captures all the logic associated with the EMP event processing.
Now that the callback routine has been defined we register it so that Oracle will call it whenever something is placed in our queue. To do this we need to call the DBMS_AQ.REGISTER routine, telling it the queue and the processing routine it needs to call:
BEGIN dbms_aq.register (sys.aq$_reg_info_list (sys.aq$_reg_info ('SCOTT.EMP_EVENT_Q' -- the queue ,DBMS_AQ.NAMESPACE_AQ ,'plsql://SCOTT.EMP_EVENT_CALLBACK' -- this is the routine that will get called when a message is queued ,NULL) ), 1 ); END; /
… and that’s pretty much all there is to it. Keep in mind that the messages on the queue are transactional so they will not be processed until the session that enqueues the messages commits, but this is exactly the behaviour that we would normally desire.
Of course it is much simpler to encapsulate all of the above inside packaged routines. For the above EMP event scenario I have constructed two packages, available for download:
- emp_event_management, which handles the setting up of the queue and registering the callback routine
- emp_event_processing, which contains the routines to queue up events and processing those same event messages
Once all the complicated DBMS_AQ calls are hidden behind a management API, starting up the event processing is just:
EXEC emp_event_management.setup EXEC emp_event_management.startup
Shutting things down is:
EXEC emp_event_management.shutdown EXEC emp_event_management.teardown
The package routines write processing messages to a table named MSG_LOG. I’ve gone a bit overboard with the message written out to help understand what’s going on. The messages are timestamped so the speed of processing can be monitored.
Creating events for processing are matter of making calls to our processing package, similar to these:
EXEC emp_event_processing.update_sal (p_empno => 7654,p_sal => 1600) EXEC emp_event_processing.update_dept (p_empno => 7654,p_deptno => 20)
By way of demonstration, let’s execute the above commands against the standard SCOTT.EMP table. We’ll grab the current time so as to filter our MSG_LOG table later on, view the data we’re about to change, queue the messages, note the current time so we can see how long the callback processing takes, list the MSG_LOG entries and lastly view the data again to confirm the changes:
VARIABLE now VARCHAR2(30) EXEC :now := TO_CHAR(SYSTIMESTAMP,'dd/mm/yyyy hh24:mi:ss.ff') SELECT empno , ename , sal , deptno , mgr FROM emp WHERE empno = 7654 / EXEC emp_event_processing.update_sal (p_empno => 7654,p_sal => 1600) EXEC emp_event_processing.update_dept (p_empno => 7654,p_deptno => 20) SELECT TO_CHAR(SYSTIMESTAMP,'hh24:mi:ss.ff') AS current_time FROM dual; COMMIT; SELECT * FROM msg_log WHERE msg_timestamp >= TO_TIMESTAMP(:now,'dd/mm/yyyy hh24:mi:ss.ff') ORDER BY msg_id DESC / SELECT empno , ename , sal , deptno , mgr FROM emp WHERE empno = 7654 /
The output of which is:
SQL> VARIABLE now VARCHAR2(30)
SQL> EXEC :now := TO_CHAR(SYSTIMESTAMP,'dd/mm/yyyy hh24:mi:ss.ff')
PL/SQL procedure successfully completed.
SQL> SELECT empno
2 , ename
3 , sal
4 , deptno
5 , mgr
6 FROM emp
7 WHERE empno = 7654
8 /
EMPNO ENAME SAL DEPTNO MGR
---------- ---------- ---------- ---------- ----------
7654 MARTIN 1250 30 7698
SQL> EXEC emp_event_processing.update_sal (p_empno => 7654,p_sal => 1600)
PL/SQL procedure successfully completed.
SQL> EXEC emp_event_processing.update_dept (p_empno => 7654,p_deptno => 20)
PL/SQL procedure successfully completed.
SQL> SELECT TO_CHAR(SYSTIMESTAMP,'hh24:mi:ss.ff') AS current_time FROM dual;
CURRENT_TIME
------------------
07:09:02.313000
SQL> COMMIT;
Commit complete.
SQL>
SQL> SELECT *
2 FROM msg_log
3 WHERE msg_timestamp >= TO_TIMESTAMP(:now,'dd/mm/yyyy hh24:mi:ss.ff')
4 ORDER BY
5 msg_id DESC
6 /
MSG_ID MSG_TIMESTAMP MSG
------ --------------- --------------------------------------------------
237 07:09:02.329000 Processed CHANGE_DEPT event for emp 7654
236 07:09:02.329000 Entering emp_event_callback
235 07:09:02.329000 Processed UPDATE_SAL event for emp 7654
234 07:09:02.329000 Entering emp_event_callback
SQL> SELECT empno
2 , ename
3 , sal
4 , deptno
5 , mgr
6 FROM emp
7 WHERE empno = 7654
8 /
EMPNO ENAME SAL DEPTNO MGR
---------- ---------- ---------- ---------- ----------
7654 MARTIN 1600 20 7566
All went according to plan. The changes took just a fraction of a second to process from when the messages were queued, which certainly satisfies the criteria of “near real time” to me.
The full set of scripts for the above are available for download.
Very nice.
I’ve used this in the past to isolate a custom-built email and SMS messaging system from my user’s transactions. The application code would simply call “send_email” or “send_sms”, commit, and wouldn’t need to wait for the mail server or sms gateway to respond.
The nice thing about AQ is that once you have set it up, it’s not hard to augment it with a range of performance and throttling characteristics, without needing to change the application. For example, I needed to throttle my system so that priority messages would be sent immediately, and non-urgent messages would wait until a quiet period before being sent.
Pingback: 使用Oracle高级队列来监控数据库表列的更新时间。 - 实战宝典
Pingback: 使用Oracle高级队列来监控数据库表列的更新时间。 – 运维实战侠
Pingback: 使用Oracle高级队列来监控数据库表列的更新时间。 – 实战宝典