So far in event-based processing posts using AQ we’ve looked at callback routines and multi-subscriber queues. Next we’ll look into another useful AQ feature; rule-based subscriptions.
In the last post, the routine to subscribe to the queue, inside emp_event_management.add_subscriber, was:
dbms_aqadm.add_subscriber (queue_name => c_queue_owner || '.' || c_queue_name ,subscriber => SYS.AQ$_AGENT(p_subscriber,NULL,NULL));
where:
- c_queue_owner is the owner of the queue table
- c_queue_name is the name of the queue
- p_subscriber is the subscriber name being added
The ADD_SUBSCRIBER routine is able to accept another parameter, named RULE, which contains a Boolean expression that is evaluated when a message is queued. Here’s what the Oracle 12.1 documentation says about the rule parameter:
A conditional expression based on the message properties, the message data properties and PL/SQL functions. A rule is specified as a Boolean expression using syntax similar to the WHERE clause of a SQL query. This Boolean expression can include conditions on message properties, user data properties (object payloads only), and PL/SQL or SQL functions (as specified in the where clause of a SQL query). Currently supported message properties are priority and corrid.
To specify rules on a message payload (object payload), use attributes of the object type in clauses. You must prefix each attribute with tab.user_data as a qualifier to indicate the specific column of the queue table that stores the payload. The rule parameter cannot exceed 4000 characters.
So, for our employee events example code, suppose we needed to send a message to the people involved in relocating a staff member when they changed department. We could set up a subscriber to recieve all messages and discard those messages that are not “change department” messages but it would be more efficient to set the subscriber to only receive the “change department” messages in the first place. To do so we would need to construct the following rule:
tab.user_data.event_operation = 'CHANGE_DEPT'
In the above expression:
- tab.user_data is a reference to the message payload object
- event_operation is an attribute of our queue payload type (see the t_emp_event definition)
- the ‘CHANGE_DEPT’ string is event_operation value for a Change Department message (see constants within the EMP_EVENT_MANAGEMENT package spec)
While the rule string can be quite flexible, for our employee event code example I’m going to assume that we’ll restrict subscribers to simply choosing the employee events they wish to received. As a consequence, the ADD_SUBSCRIBER routine in the package EMP_EVENT_MANAGEMENT will be extended to accept a collection of strings, where each string would be an employee event (i.e. the event_operation of the t_emp_event type).
The first change is to create a collection type. I’ve gone with a schema level nested table type over an associative array as a nested table may be intrinsically NULL, allowing a cleaner interface that is backwards compatible with the previous version of the EMP_EVENT_MANAGEMENT code:
CREATE TYPE t_emp_event_type_tab IS TABLE OF VARCHAR2(20)
Next we add a new parameter to the ADD_SUBSCRIBER routine in the EMP_EVENT_MANAGEMENT package spec:
PROCEDURE add_subscriber (p_subscriber IN VARCHAR2
,p_error_on_duplicate IN BOOLEAN DEFAULT TRUE
,p_emp_event_types IN t_emp_event_type_tab DEFAULT NULL);
Lastly we add the changes to the ADD_SUBSCRIBER routine in the package body:
PROCEDURE add_subscriber (p_subscriber IN VARCHAR2 ,p_error_on_duplicate IN BOOLEAN DEFAULT TRUE ,p_emp_event_types IN t_emp_event_type_tab DEFAULT NULL) IS ex_duplicate_subscriber EXCEPTION; PRAGMA EXCEPTION_INIT (ex_duplicate_subscriber,-24034); l_subscription_rule VARCHAR2(1000); BEGIN -- construct a rule string if we've been given one or more emp event strings IF (p_emp_event_types IS NOT NULL AND p_emp_event_types.COUNT > 0) THEN -- the initial part of the rule string l_subscription_rule := 'tab.user_data.event_operation IN ('; -- append a comma separate list of emp event types strings FOR i IN 1..p_emp_event_types.COUNT LOOP l_subscription_rule := l_subscription_rule || '''' || p_emp_event_types(i) || ''','; END LOOP; -- replace the trailing comma with a closing bracket to complete the rule l_subscription_rule := SUBSTR(l_subscription_rule,1,LENGTH(l_subscription_rule)-1) || ')'; END IF; dbms_aqadm.add_subscriber (queue_name => c_queue_owner || '.' || c_queue_name ,subscriber => SYS.AQ$_AGENT(p_subscriber,NULL,NULL) -- add the constructed rule string as a parameter when -- add a subscriber to the ,rule => l_subscription_rule); msg ('Added ' || p_subscriber || ' subscriber to queue ' || c_queue_owner || '.' || c_queue_name); msg ('Subscription rule was : ' || NVL(l_subscription_rule,'<>')); EXCEPTION WHEN ex_duplicate_subscriber THEN IF p_error_on_duplicate THEN RAISE; END IF; END add_subscriber;
Using the test script we can add a subscriber that is interested only in the CHANGE_DEPT evnts, enqueue different event type messages and see which ones it ends up processing:
DECLARE l_emp_event_type_tab t_emp_event_type_tab := t_emp_event_type_tab('CHANGE_DEPT'); BEGIN emp_event_management.add_subscriber (p_subscriber => 'TEST_SCRIPT' ,p_emp_event_types => l_emp_event_type_tab); END; / EXEC emp_event_processing.update_sal (p_empno=>7654,p_sal=>1600) EXEC emp_event_processing.update_dept (p_empno=>7654,p_deptno=>20) COMMIT; DECLARE ex_queue_timeout EXCEPTION; PRAGMA EXCEPTION_INIT (ex_queue_timeout, -25228); l_dequeue_options dbms_aq.dequeue_options_t; l_message_properties dbms_aq.message_properties_t; l_message_handle RAW(16); l_payload t_emp_event; l_subscriber_name VARCHAR2(30) := 'TEST_SCRIPT'; BEGIN l_dequeue_options.wait := 5; -- some nominal wait for a message to arrive l_dequeue_options.consumer_name := l_subscriber_name; -- loop through the messages until we find no more LOOP BEGIN dbms_aq.dequeue (queue_name => emp_event_management.c_queue_owner || '.' || emp_event_management.c_queue_name ,dequeue_options => l_dequeue_options ,message_properties => l_message_properties ,payload => l_payload ,msgid => l_message_handle); -- here's where we would normallydo our message processing, which is -- just to dump some details of the message dbms_output.put_line ('Processed ' || l_payload.event_operation || ' event for emp ' || TO_CHAR(l_payload.empno)); EXCEPTION WHEN ex_queue_timeout THEN EXIT; END; END LOOP; END; /
The output of which is:
Processed CHANGE_DEPT event for emp 7654 PL/SQL procedure successfully completed.
indicating that while we queued a change department and a change salary event, our test script only processed the former, which is exactly what our subscription rule was supposed to do.
The full set of scripts for the above are available for download.