Table of Contents
This chapter briefly introduces the DBMS_AQ package, and describes how to use the procedures and functions of the package.
DBMS_AQ provides procedures and functions related to Tibero Advanced Queuing which is a message queuing function used in Tibero database. It permanently saves a message in a database table, and allows using SQL to directly query queue table data.
A queue can be used for either a single or multiple consumers. A message enqueued to a single-consumer queue can be dequeued only by the consumer. A message can be enqueued to a multiple-consumer queue intended for default subscribers and specified receivers, and each consumer can dequeue the message that has been enqueued for the consumer.
A tbCLI client can register itself to receive an event alert when a message is enqueued to a queue in which it is interested. If a callback function is specified during the registration, it can be executed whenever it receives an alert.
Only RAW message payload type is supported. For more information about how to register alerts and execute callback functions in tbCLI, refer to Tibero tbCLI Guide.
Constants defined in DBMS_AQ are:
IMMEDIATE
IMMEDIATE CONSTANT PLS_INTEGER := 0;
ON_COMMIT
ON_COMMIT CONSTANT PLS_INTEGER := 1;
BROWSE
BROWSE CONSTANT BINARY_INTEGER := 1;
LOCKED
LOCKED CONSTANT BINARY_INTEGER := 2;
REMOVE
REMOVE CONSTANT BINARY_INTEGER := 3;
REMOVE_NODATA
REMOVE_NODATA CONSTANT BINARY_INTEGER := 4;
FIRST_MESSAGE
FIRST_MESSAGE CONSTANT BINARY_INTEGER := 1;
NEXT_MESSAGE
NEXT_MESSAGE CONSTANT BINARY_INTEGER := 3;
FOREVER
FOREVER CONSTANT BINARY_INTEGER := -1;
NO_WAIT
NO_WAIT CONSTANT BINARY_INTEGER := 0;
NAMESPACE_AQ
NAMESPACE_AQ CONSTANT BINARY_INTEGER := 1;
This section describes the types provided by the DBMS_AQ package, in alphabetical order.
Specifies a message producer or consumer.
Details about the AQ$_AGENT type are as follows:
Prototype
TYPE AQ$_AGENT IS RECORD ( name VARCHAR2(30), address VARCHAR2(1024) DEFAULT NULL, protocol NUMBER DEFAULT 0 );
Field
Field | Description |
---|---|
name | Producer or consumer name. The same rule as for object names applies. |
address | Unused in the current version. |
protocol | Unused in the current version. |
Specifies a list of agents that will receive a message. This type is used only for a multiple-consumer queue.
Details about the AQ$_RECIPIENT_LIST_T type are as follows:
Prototype
TYPE AQ$_RECIPIENT_LIST_T IS TABLE OF AQ$_AGENT INDEX BY BINARY_INTEGER;
Specifies options used during dequeue.
Details about the DEQUEUE_OPTIONS_T type are as follows:
Prototype
TYPE DEQUEUE_OPTIONS_T IS RECORD ( consumer_name VARCHAR2(30) DEFAULT NULL, dequeue_mode BINARY_INTEGER DEFAULT REMOVE, navigation BINARY_INTEGER DEFAULT NEXT_MESSAGE, visibility BINARY_INTEGER DEFAULT ON_COMMIT, wait BINARY_INTEGER DEFAULT FOREVER, msgid RAW(16) DEFAULT NULL, correlation VARCHAR2(128) DEFAULT NULL, deq_condition VARCHAR2(4000) DEFAULT NULL );
Field
Field | Description |
---|---|
consumer_name | Consumer name. Only messages sent to this consumer can be dequeued. This field must be NULL for a single-consumer queue. |
dequeue_mode | Lock method used for dequeue. Options are:
|
navigation | Position of the message to fetch. Options are:
|
visibility | Option to include the dequeue operation in the current transaction. If dequeue_mode is BROWSE, this field is ignored. Options are:
|
wait | Waiting time when there is no message that meets the search criteria. Options are:
|
msgid | Identifier of the message to dequeue. |
correlation | Correlation identifier of the message to dequeue. Pattern comparison characters, such as a percent sign (%) and underscore (_), can be used. |
deq_condition | Conditional expression for a message or message data property. This is a true or false expression like an expression in the SQL WHERE clause. A message property includes the priority, corrid, and other columns of the queue table. |
Specifies options used during enqueue.
Details about the ENQUEUE_OPTIONS_T type are as follows:
Prototype
TYPE ENQUEUE_OPTIONS_T IS RECORD ( visibility BINARY_INTEGER DEFAULT ON_COMMIT );
Field
Field | Description |
---|---|
visibility | Option to include the enqueue job in the current transaction. Options are:
|
Specifies the properties of each message. A message is enqueued with the properties set and the properties are returned when dequeued.
Details about the MESSAGE_PROPERTIES_T type are as follows:
Prototype
TYPE MESSAGE_PROPERTIES_T IS RECORD ( priority BINARY_INTEGER NOT NULL DEFAULT 1, correlation VARCHAR2(128) DEFAULT NULL, recipient_list AQ$_RECIPIENT_LIST_T, enqueue_time DATE );
Field
Field | Description |
---|---|
priority | Message priority. A smaller number means a higher priority. Any number (including negative number) can be used. |
correlation | Correlation identifier specified by the producer when the message is enqueued. |
recipient_list | Index table for an AQ$_AGENT agent used to directly specify a message receiver. This field can be used only for a multiple-consumer queue. If not set, only default subscribers are receivers. This parameter is not returned to consumers during dequeue. |
enqueue_time | Point in time when a message is enqueued. The value is determined by the system and cannot be specified by the user when the message is enqueued. |
Specifies properties of multiple messages when dbms_aq.enqueue_array or dbms_aq.dequeue_array is called. As many properties as the number of payloads specified in payload_array must be specified in message_properties_array_t varray.
Details about the MESSAGE_PROPERTIES_ARRAY_T type are as follows:
Prototype
TYPE MESSAGE_PROPERTIES_ARRAY_T IS VARRAY (2147483647) OF MESSAGE_PROPERTIES_T;
This section describes the procedures and functions provided by the DBMS_AQ package, in alphabetical order.
Dequeues a message from a target queue. The message is identified according to the values of consumer_name, msgid, correlation, or deq_condition in dequeue_options_t. For descriptions about each field, refer to “5.2.3. DEQUEUE_OPTIONS_T” in “5.2. Types”.
The dequeue order is determined by the sort order specified when the queue table is created unless overridden by the msgid or correlation options in dequeue_options_t. The consistent read rule applies to enqueue and dequeue. For example, an enqueued message can be invisible after BROWSE starts.
The default value of the navigation field is NEXT_MESSAGE. If dequeue is executed repeatedly with the default value, messages are returned sequentially based on a snapshot taken when the first message is dequeued. This means that a message, which is enqueued after the first message is dequeued, can only be consumed after all the messages currently being dequeued have been consumed. In general, this works without an issue, except that the FIRST_MESSAGE option must be used to always dequeue the first message of a queue due to reasons such as priority. For example, FIRST_MESSAGE must be used to process the message with a higher priority first when it is enqueued while an existing enqueued message is being processed.
Details about the DEQUEUE procedure are as follows:
Prototype
DBMS_AQ.DEQUEUE ( queue_name IN VARCHAR2, dequeue_options IN DEQUEUE_OPTIONS_T, message_properties OUT MESSAGE_PROPERTIES_T, payload OUT RAW, msgid OUT RAW );
Parameter
Parameter | Description |
---|---|
queue_name | Name of the queue from which to dequeue a message. |
dequeue_options | Options that apply to all messages to dequeue. For more information, refer to “5.2.3. DEQUEUE_OPTIONS_T” in “5.2. Types”. |
message_properties | Properties of a message to return. For more information, refer to “5.2.5. MESSAGE_PROPERTIES_T” in “5.2. Types”. |
payload | User-specified payload. Currently, only the RAW type is supported. |
msgid | Identifier of a dequeued message. |
Example
DECLARE dequeue_options DBMS_AQ.dequeue_options_t; message_properties DBMS_AQ.message_properties_t; message_handle RAW(16); message raw(1000); BEGIN dequeue_options.consumer_name := 'RED'; dequeue_options.deq_condition := 'enq_time is not null'; DBMS_AQ.DEQUEUE( queue_name => 'my_multi_q', dequeue_options => dequeue_options, message_properties => message_properties, payload => message, msgid => message_handle); dbms_output.put_line('msgid: ' || message_handle); END; /
Dequeues multiple messages in a single operation, and returns the messages using a payload array, a message property array, and a message identifier array. The return value is the number of successfully dequeued messages.
Waiting time specified in dequeue_options_t is applied only when there is no message to dequeue in the queue. If there is one or more messages to dequeue in the queue, DEQUEUE_ARRAY dequeues as many messages as array_size and then returns immediately.
The msgid cannot be specified through dequeue_options_t. For information about navigation parameters, refer to “5.2.3. DEQUEUE_OPTIONS_T” in “5.2. Types”.
Details about the DEQUEUE_ARRAY function are as follows:
Prototype
DBMS_AQ.DEQUEUE_ARRAY ( queue_name IN VARCHAR2, dequeue_options IN DEQUEUE_OPTIONS_T, array_size IN PLS_INTEGER, message_properties_array OUT MESSAGE_PROPERTIES_ARRAY_T, payload_array OUT PAYLOAD_ARRAY_T, msgid_array OUT MSGID_ARRAY_T ) RETURN pls_integer;
Parameter
Parameter | Description |
---|---|
queue_name | Name of the queue from which to dequeue messages. |
dequeue_options | Options that apply to all messages to dequeue. For more information, refer to “5.2.3. DEQUEUE_OPTIONS_T” in “5.2. Types”. |
array_size | Number of messages to dequeue. |
message_properties_array | Property array of messages to return. For more information, refer to “5.2.6. MESSAGE_PROPERTIES_ARRAY_T” in “5.2. Types”. |
payload_array | Payload array of messages to return. For more information, refer to “5.2.8. PAYLOAD_ARRAY_T” in “5.2. Types”. |
msgid_array | Identifier array of dequeued messages. For more information, refer to “5.2.7. MSGID_ARRAY_T” in “5.2. Types”. |
Example
declare dequeue_options DBMS_AQ.dequeue_options_t; msg_prop_array DBMS_AQ.message_properties_array_t; payload_array dbms_aq.payload_array_t; msgid_array DBMS_AQ.msgid_array_t; retval PLS_INTEGER; BEGIN dequeue_options.consumer_name := 'RED'; retval := DBMS_AQ.DEQUEUE_ARRAY( queue_name => 'my_multi_q', dequeue_options => dequeue_options, array_size => 10, message_properties_array => msg_prop_array, payload_array => payload_array, msgid_array => msgid_array); DBMS_OUTPUT.PUT_LINE('Number of messages dequeued: ' || retval); for i in 1..retval loop dbms_output.put_line ('msgid('||i||'): ' || msgid_array(i)); end loop; END; /
Enqueues a message to the target queue. This procedure returns an error when executed for a multiple-consumer queue without registering a default subscriber and specifying a receiver because there is no target location to send a message to.
Details about the ENQUEUE procedure are as follows:
Prototype
DBMS_AQ.ENQUEUE ( queue_name IN VARCHAR2, enqueue_options IN ENQUEUE_OPTIONS_T, message_properties IN MESSAGE_PROPERTIES_T, payload IN RAW, msgid OUT RAW );
Parameter
Parameter | Description |
---|---|
queue_name | Name of the queue to enqueue a message to. |
enqueue_options | Options that apply to all messages to enqueue. For more information, refer to “5.2.4. ENQUEUE_OPTIONS_T” in “5.2. Types”. |
message_properties | Properties of the message to enqueue. For more information, refer to “5.2.5. MESSAGE_PROPERTIES_T” in “5.2. Types”. |
payload | User-specified payload. Currently, only the RAW type is supported. |
msgid | Message identifier assigned by the system. The value is unique and can be used to identify a message to dequeue. |
Example
DECLARE enqueue_options DBMS_AQ.enqueue_options_t; message_properties DBMS_AQ.message_properties_t; message_handle RAW(16); BEGIN message_properties.priority := 2; message_properties.correlation := 'abc'; DBMS_AQ.ENQUEUE( queue_name => 'my_q', enqueue_options => enqueue_options, message_properties => message_properties, payload => hextoraw('FFFF'), msgid => message_handle); dbms_output.put_line('msgid: ' || message_handle); END; /
Enqueues multiple payloads with each corresponding message property in a single operation, and returns an array that includes the identifiers of enqueued messages. The return value is the number of successfully enqueued messages.
Details about the ENQUEUE_ARRAY function are as follows:
Prototype
DBMS_AQ.ENQUEUE_ARRAY ( queue_name IN VARCHAR2, enqueue_options IN ENQUEUE_OPTIONS_T, array_size IN PLS_INTEGER, message_properties_array IN MESSAGE_PROPERTIES_ARRAY_T, payload_array IN PAYLOAD_ARRAY_T, msgid_array OUT MSGID_ARRAY_T ) RETURN pls_integer;
Parameter
Parameter | Description |
---|---|
queue_name | Name of the queue to enqueue messages to. |
enqueue_options | Options that apply to all messages to enqueue. For more information, refer to “5.2.4. ENQUEUE_OPTIONS_T” in “5.2. Types”. |
array_size | Number of messages to enqueue. |
message_properties_array | Property array of messages to enqueue. For more information, refer to “5.2.6. MESSAGE_PROPERTIES_ARRAY_T” in “5.2. Types”. |
payload_array | Payload array of messages to enqueue. For more information, refer to “5.2.8. PAYLOAD_ARRAY_T” in “5.2. Types”. |
msgid_array | Identifier array of enqueued messages. For more information, refer to “5.2.7. MSGID_ARRAY_T” in “5.2. Types”. |
Example
declare v dbms_aq.payload_array_t; msg_prop_array dbms_Aq.message_properties_array_t; retval pls_integer; msgid_array dbms_aq.msgid_array_t; enqueue_options DBMS_AQ.enqueue_options_t; begin v := dbms_aq.payload_array_t (hextoraw('FFFF'), hextoraw('EEEE'), hextoraw('EEEE'), hextoraw('EEEE'), hextoraw('EEEE'), hextoraw('EEEE')); msg_prop_array := dbms_aq.message_properties_array_t (); msg_prop_array.extend(6); for i in 1..6 loop msg_prop_array(i).priority := 7 - i; end loop; retval := dbms_aq.enqueue_array ( queue_name => 'my_multi_q', enqueue_options => enqueue_options, array_size => 6, message_properties_array => msg_prop_array, payload_array => v, msgid_array => msgid_array); dbms_output.put_line('processed: ' || retval); for i in 1..retval loop dbms_output.put_line ('msgid('||i||'): ' || msgid_array(i)); end loop; end; /