In the last post we looked at asynchronous processing using a queue and a callback routine. Event-based processing is very powerful and flexible. Oracle’s Advanced Queues provide many features that are useful in setting up event-based processing. This post will look at muti-subscriber queues.
The queue created for the previous post was a single subscriber queue, i.e. a message is available for being dequeued just once so it’s pretty much limited to a single dequeue process. Queues can also be configured to support multiple subscribers to the queue, allowing us to initiate multiple business processes from the one event. In this scenario, each subscriber is able to dequeue the same message.
The changes to our previous code to support multi-subscriber queue are relatively few. When creating the queue table we need to specify that it will support multiple subscribers. In the routine SETUP inside the package EMP_EVENT_MANAGEMENT you will find the following change:
dbms_aqadm.create_queue_table (queue_table => c_queue_owner || '.' || c_queue_table
,queue_payload_type => c_queue_payload
,multiple_consumers => TRUE);
We also have routines to add and remove queue subscribers exposed in the package spec:
PROCEDURE add_subscriber (p_subscriber IN VARCHAR2 ,p_error_on_duplicate IN BOOLEAN DEFAULT TRUE); PROCEDURE remove_subscriber (p_subscriber IN VARCHAR2 ,p_error_on_not_exist IN BOOLEAN DEFAULT TRUE);
The package body has an additional routine for removing all subscribers from the queue, which invoked by the teardown routine to clean things up properly.
When creating a process that will consume messages from a queue, the process will need to:
- add itself as a subscriber of the queue
- specify its subscriber name when dequeuing messages
When registering a callback routine, the queue subscriber name is specified after the name of the callback routine, as shown below:
PROCEDURE register_event_callback IS BEGIN dbms_aq.register (sys.aq$_reg_info_list (sys.aq$_reg_info -- the next bit needs to be "queue_name : subscriber" (c_queue_owner || '.' || c_queue_name || ':' || c_callback_subscriber ,DBMS_AQ.NAMESPACE_AQ -- this is the routine that will get called when a message is queued -- I'm going to assume it's in the same schema as the queue ,'plsql://' || c_queue_owner || '.emp_event_processing.emp_event_callback' ,NULL ) ), 1 ); msg('Callback registered'); END register_event_callback;
The unregister routine is similarly modified to include the subscriber.
Looking at the callback routine itself, contained in the EMP_EVENT_PROCESSING package, we need to specify the subscriber when dequeuing the message although this is provided in the details passed into the callback routine:
PROCEDURE emp_event_callback (context RAW
,reginfo SYS.AQ$_REG_INFO
,descr SYS.AQ$_DESCRIPTOR
,payload RAW
,payloadl NUMBER)
IS
l_dequeue_options dbms_aq.dequeue_options_t;
l_message_properties dbms_aq.message_properties_t;
l_message_handle RAW(16);
l_payload t_emp_event;
BEGIN
msg ('Entering emp_event_callback');
-- dequeue the message specified
l_dequeue_options.msgid := descr.msg_id;
l_dequeue_options.wait := dbms_aq.no_wait;
l_dequeue_options.consumer_name := descr.consumer_name;
dbms_aq.dequeue (queue_name => descr.queue_name
,dequeue_options => l_dequeue_options
,message_properties => l_message_properties
,payload => l_payload
,msgid => l_message_handle);
process_message (p_emp_event => l_payload);
msg ('Processed ' || l_payload.event_operation || ' event for emp ' || TO_CHAR(l_payload.empno));
-- commit here to ensure we remove the message
COMMIT;
END emp_event_callback;
All of these changes are required simply to get the previous code working on a multi-subscriber queue. We still don’t have a second subscriber. We could set up a second callback routine or simply test our queue using a test script, like the one following:
EXEC emp_event_management.add_subscriber (p_subscriber => 'TEST_SCRIPT') EXEC emp_event_processing.update_sal (p_empno=>7654,p_sal=>1600) EXEC emp_event_processing.update_dept (p_empno=>7654,p_deptno=>20) COMMIT; DECLARE ex_queue_timeout EXCEPTION; PRAGMA EXCEPTION_INIT (ex_queue_timeout, -25228); l_dequeue_options dbms_aq.dequeue_options_t; l_message_properties dbms_aq.message_properties_t; l_message_handle RAW(16); l_payload t_emp_event; l_subscriber_name VARCHAR2(30) := 'TEST_SCRIPT'; BEGIN l_dequeue_options.wait := 5; -- some nominal wait for a message to arrive l_dequeue_options.consumer_name := l_subscriber_name; -- loop through the messages until we find no more LOOP BEGIN dbms_aq.dequeue (queue_name => emp_event_management.c_queue_owner || '.' || emp_event_management.c_queue_name ,dequeue_options => l_dequeue_options ,message_properties => l_message_properties ,payload => l_payload ,msgid => l_message_handle); -- here's where we would normallydo our message processing, which is -- just to dump some details of the message dbms_output.put_line ('Processed ' || l_payload.event_operation || ' event for emp ' || TO_CHAR(l_payload.empno)); EXCEPTION WHEN ex_queue_timeout THEN EXIT; END; END LOOP; END; / -- commit here to ensure we remove the message COMMIT;
The output from the mainpart of the above script is:
Processed UPDATE_SAL event for emp 7654 Processed CHANGE_DEPT event for emp 7654
and if we take a look at the MSG_LOG table where our callback routine logs its processing activity:
SQL> SELECT * 2 FROM msg_log 3 WHERE msg_timestamp >= SYSTIMESTAMP - INTERVAL '2' MINUTE 4 ORDER BY 5 msg_id DESC 6 / MSG_ID MSG_TIMESTAMP MSG ------ --------------- -------------------------------------------------- 290 06:44:31.481000 Processed CHANGE_DEPT event for emp 7654 289 06:44:31.481000 Entering emp_event_callback 288 06:44:31.481000 Processed UPDATE_SAL event for emp 7654 287 06:44:31.481000 Entering emp_event_callback
we can see that it too has processed the messages.
The full set of scripts for the above are available for download.