Chapter 5. DBMS_AQ

Table of Contents

5.1. Overview
5.2. Types
5.2.1. AQ$_AGENT
5.2.2. AQ$_RECIPIENT_LIST_T
5.2.3. DEQUEUE_OPTIONS_T
5.2.4. ENQUEUE_OPTIONS_T
5.2.5. MESSAGE_PROPERTIES_T
5.2.6. MESSAGE_PROPERTIES_ARRAY_T
5.2.7. MSGID_ARRAY_T
5.2.8. PAYLOAD_ARRAY_T
5.3. Procedures and Functions
5.3.1. DEQUEUE
5.3.2. DEQUEUE_ARRAY
5.3.3. ENQUEUE
5.3.4. ENQUEUE_ARRAY

This chapter briefly introduces the DBMS_AQ package, and describes how to use the procedures and functions of the package.

5.1. Overview

DBMS_AQ provides procedures and functions related to Tibero Advanced Queuing which is a message queuing function used in Tibero database. It permanently saves a message in a database table, and allows using SQL to directly query queue table data.

A queue can be used for either a single or multiple consumers. A message enqueued to a single-consumer queue can be dequeued only by the consumer. A message can be enqueued to a multiple-consumer queue intended for default subscribers and specified receivers, and each consumer can dequeue the message that has been enqueued for the consumer.

A tbCLI client can register itself to receive an event alert when a message is enqueued to a queue in which it is interested. If a callback function is specified during the registration, it can be executed whenever it receives an alert.

Note

Only RAW message payload type is supported. For more information about how to register alerts and execute callback functions in tbCLI, refer to Tibero tbCLI Guide.

Constants defined in DBMS_AQ are:

  • IMMEDIATE

    IMMEDIATE CONSTANT PLS_INTEGER := 0;
  • ON_COMMIT

    ON_COMMIT CONSTANT PLS_INTEGER := 1;
  • BROWSE

    BROWSE CONSTANT BINARY_INTEGER := 1;
  • LOCKED

    LOCKED CONSTANT BINARY_INTEGER := 2;
  • REMOVE

    REMOVE CONSTANT BINARY_INTEGER := 3;
  • REMOVE_NODATA

    REMOVE_NODATA CONSTANT BINARY_INTEGER := 4;
  • FIRST_MESSAGE

    FIRST_MESSAGE CONSTANT BINARY_INTEGER := 1;
  • NEXT_MESSAGE

    NEXT_MESSAGE CONSTANT BINARY_INTEGER := 3;
  • FOREVER

    FOREVER CONSTANT BINARY_INTEGER := -1;
  • NO_WAIT

    NO_WAIT CONSTANT BINARY_INTEGER := 0;
  • NAMESPACE_AQ

    NAMESPACE_AQ CONSTANT BINARY_INTEGER := 1;

5.2. Types

This section describes the types provided by the DBMS_AQ package, in alphabetical order.

5.2.1. AQ$_AGENT

Specifies a message producer or consumer.

Details about the AQ$_AGENT type are as follows:

  • Prototype

    TYPE AQ$_AGENT IS RECORD 
    (
         name     VARCHAR2(30),
         address  VARCHAR2(1024) DEFAULT NULL,
         protocol NUMBER DEFAULT 0
    );
  • Field

    FieldDescription
    nameProducer or consumer name. The same rule as for object names applies.
    addressUnused in the current version.
    protocolUnused in the current version.

5.2.2. AQ$_RECIPIENT_LIST_T

Specifies a list of agents that will receive a message. This type is used only for a multiple-consumer queue.

Details about the AQ$_RECIPIENT_LIST_T type are as follows:

  • Prototype

    TYPE AQ$_RECIPIENT_LIST_T IS TABLE OF AQ$_AGENT
            INDEX BY BINARY_INTEGER;

5.2.3. DEQUEUE_OPTIONS_T

Specifies options used during dequeue.

Details about the DEQUEUE_OPTIONS_T type are as follows:

  • Prototype

    TYPE DEQUEUE_OPTIONS_T IS RECORD 
    (
         consumer_name     VARCHAR2(30)    DEFAULT NULL,
         dequeue_mode      BINARY_INTEGER  DEFAULT REMOVE,
         navigation        BINARY_INTEGER  DEFAULT NEXT_MESSAGE,
         visibility        BINARY_INTEGER  DEFAULT ON_COMMIT,
         wait              BINARY_INTEGER  DEFAULT FOREVER,
         msgid             RAW(16)         DEFAULT NULL,
         correlation       VARCHAR2(128)   DEFAULT NULL,
         deq_condition     VARCHAR2(4000)  DEFAULT NULL
    );
  • Field

    FieldDescription
    consumer_nameConsumer name. Only messages sent to this consumer can be dequeued. This field must be NULL for a single-consumer queue.
    dequeue_mode

    Lock method used for dequeue.

    Options are:

    • BROWSE: read a message without using a lock. This is the same as "select".

    • LOCKED: read a dequeued message while getting a lock. The lock is valid until the transaction is maintained. This is the same as "select for update".

    • REMOVE: read and then remove the message. (Default value)

    • REMOVE_NODATA: remove a message without reading it.

    navigation

    Position of the message to fetch.

    Options are:

    • NEXT_MESSAGE: fetch the next message after the current cursor position. (Default value)

    • FIRST_MESSAGE: fetch the first message that meets the search criteria. This option initializes the message's position.

    visibility

    Option to include the dequeue operation in the current transaction. If dequeue_mode is BROWSE, this field is ignored.

    Options are:

    • ON_COMMIT: include the dequeue operation in the current transaction.

    • IMMEDIATE: do not include the dequeue operation in the current transaction. The operation is processed in an autonomous transaction and is committed after it is complete.

    wait

    Waiting time when there is no message that meets the search criteria.

    Options are:

    • FOREVER: wait forever. (Default value)

    • NO_WAIT: return without waiting.

    • Number: waiting time in seconds.

    msgidIdentifier of the message to dequeue.
    correlationCorrelation identifier of the message to dequeue. Pattern comparison characters, such as a percent sign (%) and underscore (_), can be used.
    deq_condition

    Conditional expression for a message or message data property. This is a true or false expression like an expression in the SQL WHERE clause.

    A message property includes the priority, corrid, and other columns of the queue table.

5.2.4. ENQUEUE_OPTIONS_T

Specifies options used during enqueue.

Details about the ENQUEUE_OPTIONS_T type are as follows:

  • Prototype

    TYPE
              ENQUEUE_OPTIONS_T IS RECORD ( visibility BINARY_INTEGER DEFAULT
              ON_COMMIT );
  • Field

    FieldDescription
    visibility

    Option to include the enqueue job in the current transaction.

    Options are:

    • ON_COMMIT: include the enqueue operation in the current transaction. (Default value)

    • IMMEDIATE: do not include the enqueue operation in the current transaction. The operation is processed in an autonomous transaction and is committed after it is complete.

5.2.5. MESSAGE_PROPERTIES_T

Specifies the properties of each message. A message is enqueued with the properties set and the properties are returned when dequeued.

Details about the MESSAGE_PROPERTIES_T type are as follows:

  • Prototype

    TYPE MESSAGE_PROPERTIES_T IS RECORD 
    (
         priority          BINARY_INTEGER  NOT NULL DEFAULT 1,
         correlation       VARCHAR2(128)   DEFAULT NULL,
         recipient_list    AQ$_RECIPIENT_LIST_T,
         enqueue_time      DATE
    );
  • Field

    FieldDescription
    priorityMessage priority. A smaller number means a higher priority. Any number (including negative number) can be used.
    correlationCorrelation identifier specified by the producer when the message is enqueued.
    recipient_list

    Index table for an AQ$_AGENT agent used to directly specify a message receiver. This field can be used only for a multiple-consumer queue. If not set, only default subscribers are receivers. This parameter is not returned to consumers during dequeue.

    enqueue_timePoint in time when a message is enqueued. The value is determined by the system and cannot be specified by the user when the message is enqueued.

5.2.6. MESSAGE_PROPERTIES_ARRAY_T

Specifies properties of multiple messages when dbms_aq.enqueue_array or dbms_aq.dequeue_array is called. As many properties as the number of payloads specified in payload_array must be specified in message_properties_array_t varray.

Details about the MESSAGE_PROPERTIES_ARRAY_T type are as follows:

  • Prototype

    TYPE
              MESSAGE_PROPERTIES_ARRAY_T IS VARRAY (2147483647) OF
              MESSAGE_PROPERTIES_T;

5.2.7. MSGID_ARRAY_T

Used to return message identifiers enqueued or dequeued when dbms_aq.enqueue_array or dbms_aq.dequeue_array is called.

Details about the MSGID_ARRAY_T type are as follows:

  • Prototype

    TYPE
              MSGID_ARRAY_T IS TABLE OF RAW(16) INDEX BY
              BINARY_INTEGER;

5.2.8. PAYLOAD_ARRAY_T

Specifies multiple payloads when dbms_aq.enqueue_array or dbms_aq.dequeue_array is called. Currently, only RAW payload type is supported.

Details about the PAYLOAD_ARRAY_T type are as follows:

  • Prototype

    TYPE
              PAYLOAD_ARRAY_T IS VARRAY(2147483647) OF
              RAW(32767);

5.3. Procedures and Functions

This section describes the procedures and functions provided by the DBMS_AQ package, in alphabetical order.

5.3.1. DEQUEUE

Dequeues a message from a target queue. The message is identified according to the values of consumer_name, msgid, correlation, or deq_condition in dequeue_options_t. For descriptions about each field, refer to “5.2.3. DEQUEUE_OPTIONS_T” in “5.2. Types”.

The dequeue order is determined by the sort order specified when the queue table is created unless overridden by the msgid or correlation options in dequeue_options_t. The consistent read rule applies to enqueue and dequeue. For example, an enqueued message can be invisible after BROWSE starts.

The default value of the navigation field is NEXT_MESSAGE. If dequeue is executed repeatedly with the default value, messages are returned sequentially based on a snapshot taken when the first message is dequeued. This means that a message, which is enqueued after the first message is dequeued, can only be consumed after all the messages currently being dequeued have been consumed. In general, this works without an issue, except that the FIRST_MESSAGE option must be used to always dequeue the first message of a queue due to reasons such as priority. For example, FIRST_MESSAGE must be used to process the message with a higher priority first when it is enqueued while an existing enqueued message is being processed.

Details about the DEQUEUE procedure are as follows:

  • Prototype

    DBMS_AQ.DEQUEUE 
    (
        queue_name         IN  VARCHAR2,
        dequeue_options    IN  DEQUEUE_OPTIONS_T,
        message_properties OUT MESSAGE_PROPERTIES_T,
        payload            OUT RAW,
        msgid              OUT RAW
    );
  • Parameter

    ParameterDescription
    queue_nameName of the queue from which to dequeue a message.
    dequeue_optionsOptions that apply to all messages to dequeue. For more information, refer to “5.2.3. DEQUEUE_OPTIONS_T” in “5.2. Types”.
    message_propertiesProperties of a message to return. For more information, refer to “5.2.5. MESSAGE_PROPERTIES_T” in “5.2. Types”.
    payloadUser-specified payload. Currently, only the RAW type is supported.
    msgidIdentifier of a dequeued message.
  • Example

    DECLARE
    dequeue_options     DBMS_AQ.dequeue_options_t;
    message_properties  DBMS_AQ.message_properties_t;
    message_handle      RAW(16);
    message             raw(1000);
    BEGIN
       dequeue_options.consumer_name := 'RED';
       dequeue_options.deq_condition := 'enq_time is not null';
       DBMS_AQ.DEQUEUE(
          queue_name          =>     'my_multi_q',
          dequeue_options     =>     dequeue_options,
          message_properties  =>     message_properties,
          payload             =>     message,
          msgid               =>     message_handle);
       dbms_output.put_line('msgid: ' || message_handle);
    END;
    /

5.3.2. DEQUEUE_ARRAY

Dequeues multiple messages in a single operation, and returns the messages using a payload array, a message property array, and a message identifier array. The return value is the number of successfully dequeued messages.

Waiting time specified in dequeue_options_t is applied only when there is no message to dequeue in the queue. If there is one or more messages to dequeue in the queue, DEQUEUE_ARRAY dequeues as many messages as array_size and then returns immediately.

The msgid cannot be specified through dequeue_options_t. For information about navigation parameters, refer to “5.2.3. DEQUEUE_OPTIONS_T” in “5.2. Types”.

Details about the DEQUEUE_ARRAY function are as follows:

  • Prototype

    DBMS_AQ.DEQUEUE_ARRAY 
    (
        queue_name                IN   VARCHAR2,
        dequeue_options           IN   DEQUEUE_OPTIONS_T,
        array_size                IN   PLS_INTEGER,
        message_properties_array  OUT  MESSAGE_PROPERTIES_ARRAY_T,
        payload_array             OUT  PAYLOAD_ARRAY_T,
        msgid_array               OUT  MSGID_ARRAY_T
    ) 
    RETURN pls_integer;
  • Parameter

    ParameterDescription
    queue_nameName of the queue from which to dequeue messages.
    dequeue_optionsOptions that apply to all messages to dequeue. For more information, refer to “5.2.3. DEQUEUE_OPTIONS_T” in “5.2. Types”.
    array_sizeNumber of messages to dequeue.
    message_properties_arrayProperty array of messages to return. For more information, refer to “5.2.6. MESSAGE_PROPERTIES_ARRAY_T” in “5.2. Types”.
    payload_arrayPayload array of messages to return. For more information, refer to “5.2.8. PAYLOAD_ARRAY_T” in “5.2. Types”.
    msgid_arrayIdentifier array of dequeued messages. For more information, refer to “5.2.7. MSGID_ARRAY_T” in “5.2. Types”.
  • Example

    declare
      dequeue_options       DBMS_AQ.dequeue_options_t;
      msg_prop_array        DBMS_AQ.message_properties_array_t;
      payload_array         dbms_aq.payload_array_t;
      msgid_array           DBMS_AQ.msgid_array_t;
      retval                PLS_INTEGER;
    BEGIN
      dequeue_options.consumer_name := 'RED';
    
      retval := DBMS_AQ.DEQUEUE_ARRAY(
                  queue_name               => 'my_multi_q',
                  dequeue_options          => dequeue_options,
                  array_size               => 10,
                  message_properties_array => msg_prop_array,
                  payload_array            => payload_array,
                  msgid_array              => msgid_array);
    
      DBMS_OUTPUT.PUT_LINE('Number of messages dequeued: ' || retval);
      for i in 1..retval loop
        dbms_output.put_line ('msgid('||i||'): ' || msgid_array(i));
      end loop;
    END;
    /

5.3.3. ENQUEUE

Enqueues a message to the target queue. This procedure returns an error when executed for a multiple-consumer queue without registering a default subscriber and specifying a receiver because there is no target location to send a message to.

Details about the ENQUEUE procedure are as follows:

  • Prototype

    DBMS_AQ.ENQUEUE 
    (
        queue_name         IN  VARCHAR2,
        enqueue_options    IN  ENQUEUE_OPTIONS_T,
        message_properties IN  MESSAGE_PROPERTIES_T,
        payload            IN  RAW,
        msgid              OUT RAW
    );
  • Parameter

    ParameterDescription
    queue_nameName of the queue to enqueue a message to.
    enqueue_optionsOptions that apply to all messages to enqueue. For more information, refer to “5.2.4. ENQUEUE_OPTIONS_T” in “5.2. Types”.
    message_propertiesProperties of the message to enqueue. For more information, refer to “5.2.5. MESSAGE_PROPERTIES_T” in “5.2. Types”.
    payloadUser-specified payload. Currently, only the RAW type is supported.
    msgidMessage identifier assigned by the system. The value is unique and can be used to identify a message to dequeue.
  • Example

    DECLARE
       enqueue_options     DBMS_AQ.enqueue_options_t;
       message_properties  DBMS_AQ.message_properties_t;
       message_handle      RAW(16);
    BEGIN
       message_properties.priority := 2;
       message_properties.correlation := 'abc';
       DBMS_AQ.ENQUEUE(
          queue_name              => 'my_q',
          enqueue_options         => enqueue_options,
          message_properties      => message_properties,
          payload                 => hextoraw('FFFF'),
          msgid                   => message_handle);
       dbms_output.put_line('msgid: ' || message_handle);
    END;
    /

5.3.4. ENQUEUE_ARRAY

Enqueues multiple payloads with each corresponding message property in a single operation, and returns an array that includes the identifiers of enqueued messages. The return value is the number of successfully enqueued messages.

Details about the ENQUEUE_ARRAY function are as follows:

  • Prototype

    DBMS_AQ.ENQUEUE_ARRAY 
    (
        queue_name               IN  VARCHAR2,
        enqueue_options          IN  ENQUEUE_OPTIONS_T,
        array_size               IN  PLS_INTEGER,
        message_properties_array IN  MESSAGE_PROPERTIES_ARRAY_T,
        payload_array            IN  PAYLOAD_ARRAY_T,
        msgid_array              OUT MSGID_ARRAY_T
    ) 
    RETURN pls_integer;
  • Parameter

    ParameterDescription
    queue_nameName of the queue to enqueue messages to.
    enqueue_optionsOptions that apply to all messages to enqueue. For more information, refer to “5.2.4. ENQUEUE_OPTIONS_T” in “5.2. Types”.
    array_sizeNumber of messages to enqueue.
    message_properties_arrayProperty array of messages to enqueue. For more information, refer to “5.2.6. MESSAGE_PROPERTIES_ARRAY_T” in “5.2. Types”.
    payload_arrayPayload array of messages to enqueue. For more information, refer to “5.2.8. PAYLOAD_ARRAY_T” in “5.2. Types”.
    msgid_arrayIdentifier array of enqueued messages. For more information, refer to “5.2.7. MSGID_ARRAY_T” in “5.2. Types”.
  • Example

    declare
      v               dbms_aq.payload_array_t;
      msg_prop_array  dbms_Aq.message_properties_array_t;
      retval          pls_integer;
      msgid_array     dbms_aq.msgid_array_t;
      enqueue_options DBMS_AQ.enqueue_options_t;
    begin
      v := dbms_aq.payload_array_t (hextoraw('FFFF'), hextoraw('EEEE'), 
           hextoraw('EEEE'), hextoraw('EEEE'), hextoraw('EEEE'), hextoraw('EEEE'));
      msg_prop_array := dbms_aq.message_properties_array_t ();
      msg_prop_array.extend(6);
    
      for i in 1..6 loop
        msg_prop_array(i).priority := 7 - i;
      end loop;
    
      retval := dbms_aq.enqueue_array (
          queue_name               => 'my_multi_q',
          enqueue_options          => enqueue_options,
          array_size               => 6,
          message_properties_array => msg_prop_array,
          payload_array            => v,
          msgid_array              => msgid_array);
      dbms_output.put_line('processed: ' || retval);
      for i in 1..retval loop
        dbms_output.put_line ('msgid('||i||'): ' || msgid_array(i));
      end loop;
    end;
    /