Search notes:

Oracle Scheduler: Event based job (queue)

Create «payload» type. The values of attributes of this type are put into the queue which triggers the job.
create or replace type TQ84_PAYLOAD
   authid definer
as object
(
   cat varchar2(20),
   txt varchar2(20)
);
/
Create queue and queue table:
begin
   dbms_aqadm.create_queue_table (
      queue_table        => 'TQ84_QUEUE_TABLE',
      queue_payload_type => 'TQ84_PAYLOAD',
      multiple_consumers =>  TRUE
   );

   dbms_aqadm.create_queue(
      queue_name  => 'TQ84_QUEUE',
      queue_table => 'TQ84_QUEUE_TABLE'
   );

   dbms_aqadm.start_queue (queue_name => 'TQ84_QUEUE' ) ;

end;
/
Create table to be populated by job when it is triggered:
create table tq84_data(
   ins_dt timestamp,
   val    varchar2(10)
);
The procuedure that is triggered when a message is put into the queue. This procedure is passed an instance of the payload type and inserts the payload's txt value into tq84_data:
create or replace procedure tq84_handle_msg(msg in TQ84_PAYLOAD)
   authid definer
as
begin
   insert into tq84_data values (systimestamp, msg.txt);

   commit;
end;
/
Create a program object for the procedure:
begin

   dbms_scheduler.create_program (
      program_name        => 'TQ84_PRG'        ,
      program_type        => 'STORED_PROCEDURE',
      program_action      => 'TQ84_HANDLE_MSG' ,
      number_of_arguments =>  1
   );

   dbms_scheduler.define_metadata_argument (
      program_name       => 'TQ84_PRG',
      argument_position  =>  1,
      metadata_attribute => 'EVENT_MESSAGE'
   );

   dbms_scheduler.enable('TQ84_PRG');
end;
/
To create an event-based job, two attributes must be set:
begin

   dbms_scheduler.create_job (
      job_name            => 'TQ84_JOB' ,
      start_date          =>  systimestamp,
   -- The name of the queue where messages are enqueued that trigger the job:
      queue_spec          => 'TQ84_QUEUE',
   -- The condition is expressed in the syntax of an Advanced Queueing rule (note tab.user_data): 
      event_condition     =>q'[tab.user_data.cat='proc']', -- 
      program_name        => 'TQ84_PRG',
      auto_drop           =>  false,
      enabled             =>  true
   );

-- 
-- The Scheduler runs the event-based job for each occurrence of an event that
-- matches event_condition. However, by default, events that occur while the
-- job is already running are ignored; the event gets consumed, but does not
-- trigger another run of the job.
-- This behavior can be changed by setting the job attribute
-- PARALLEL_INSTANCES to TRUE. In this case, an instance of the job is started
-- for every instance of the event, and all job instances are lightweight jobs.
--
   dbms_scheduler.set_attribute (
      name                => 'TQ84_JOB',
      attribute           => 'PARALLEL_INSTANCES',
      value               =>  true
   );

end;
/
Create a procedure to enqueue (insert) a message into the queue:
create procedure tq84_enqueue_msg(payload tq84_payload)
   authid definer
as
   id       raw(16);
   props    dbms_aq.message_properties_t;
   enqopts  dbms_aq.enqueue_options_t;
begin

   sys.dbms_aq.enqueue(
     'TQ84_QUEUE',
      enqopts,
      props,
      payload,
      id
   );

   dbms_output.put_line('Enqued message with id = ' || id);

   commit;

end;
/
Insert three messages:
begin
   tq84_enqueue_msg(tq84_payload('proc', 'abc'));
   tq84_enqueue_msg(tq84_payload('proc', 'def'));
   tq84_enqueue_msg(tq84_payload('proc', 'jkl'));
end;
/
The following block throws ORA-24033: no recipients for message:
begin
   tq84_enqueue_msg(tq84_payload('xyz', 'ghi'));
end;
/
After a short while, the value of txt is transferred into tq84_data:
select * from tq84_data;
Cleaning up.
Drop the job …
begin
   dbms_scheduler.drop_job (job_name => 'TQ84_JOB');
   dbms_scheduler.purge_log(job_name => 'TQ84_JOB');
end;
/
… the program …
begin
   dbms_scheduler.drop_program('TQ84_PRG');
end;
/
… the queue and the queue table …
begin
   dbms_aqadm.stop_queue(queue_name        => 'TQ84_QUEUE'      );
   dbms_aqadm.drop_queue(queue_name        => 'TQ84_QUEUE'      );
   dbms_aqadm.drop_queue_table(queue_table => 'TQ84_QUEUE_TABLE');
end;
/
… and the other objects:
drop procedure tq84_handle_msg;
drop procedure tq84_enqueue_msg;
drop type      tq84_payload;
drop table     tq84_data;

Index