Monday, December 12, 2016

Queue-based Concurrent Stats Prototype Implementation

This is just a prototype of a queue-based concurrent statistics implementation - using the same basic implementation I've used a a couple of years ago to create indexes concurrently.

There are reasons why such an implementation might be useful - in 11.2.0.x the built-in Concurrent Stats feature might turn out to be not really that efficient by creating lots of jobs that potentially attempt to gather statistics for different sub-objects of the same table at the same time - which can lead to massive contention on Library Cache level due to the exclusive Library Cache locks required by DDL / DBMS_STATS calls.

In 12.1 the Concurrent Stats feature obviously got a major re-write by using some more intelligent processing what and how should be processed concurrently - some of the details are exposed via the new view DBA_OPTSTAT_OPERATION_TASKS, but again I've seen it running lots of very small jobs serially one of the other in the main session which can be a performance problem if the sheer number of objects to analyze is huge.

This prototype here tries to work around these problems by using a queue-based approach for true concurrent processing in combination with an attempt to use some "intelligent" ordering of the objects to analyze in the hope to minimize contention on Library Cache level.

This prototype determines the objects to gather statistics on by calling DBMS_STATS.GATHER_DATABASE_STATS using one of the available LIST* options - so it's supposed to replace a call to GATHER_DATABASE_STATS or the built-in default nightly statistics job.

The jobs for concurrent stats gathering are created using DBMS_SCHEDULER and a custom job class, which offers the feature of binding the jobs to a specific service, which can come handy if you for example want these jobs only to execute on certain node(s) in a RAC cluster database.

It comes with the following (known) limitations:

- The current implementation offers only rudimentary logging to a very simple log table, which also gets truncated at the start of each run, so no history from previous runs gets retained. In 12c this is not such an issue since DBA_OPTSTAT_OPERATION_TASKS contains a lot of details for each individual stats gathering call.

- Currently only objects of type TABLE returned by GATHER_DATABASE_STATS are considered assuming the CASCADE option will take care of any indexes to gather statistics for

- The default behaviour attempts to make use of all available CPUs of a single node by starting as many threads as defined via CPU_COUNT. If you explicitly specify the number of concurrent threads the default behaviour is to use a DEGREE (or DOP) per gather stats operation that again makes use of all available CPU resources by using a DEGREE = CPU_COUNT divided by number_of_threads. If you don't want that you will have to specify the DEGREE / DOP explicitly, too

- I haven't spend time to investigate how this behaves with regards to incremental statistics - since it's possible that GLOBAL and (sub-)partition statistics of the same object get gathered in different jobs, potentially at the same time and in random order (so GLOBAL prior to partition for example) the outcome and behaviour with incremental statistics turned on could be a problem

More details can be found in the comments section of the code, in particular what privileges might be required and how you could replace the default statistics job if desired.

The script provided includes a de-installation and installation part of the code. All that needs to be called then to start a database-wide gather statistics processing is the main entry point "pk_stats_concurrent.stats_concurrent" using any optional parameters as desired - consider in particular the parameters to control the number of threads and the intra-operation DOP as just outlined. See the code comments for a detailed description of the available parameters.
--------------------------------------------------------------------------------
--
-- Script:       pk_stats_concurrent.sql
--
-- Author:       Randolf Geist
--
-- Copyright:    http://oracle-randolf.blogspot.com
--
-- Purpose:      A queue based concurrent stats implementation - installation script
--
-- Usage:        @pk_stats_concurrent
--
--               The script will first drop all objects (can be skipped)
--
--               And then attempt to create the objects required (can be skipped)
--
--------------------------------------------------------------------------------

spool pk_stats_concurrent.log

prompt Concurrent Stats - Deinstallation
prompt *----------------------------------------------------*
prompt This script will now attempt to drop the objects
prompt belonging to this installation
accept skip_deinstall prompt 'Hit Enter to continue, CTRL+C to cancel or enter S to skip deinstall: '

set serveroutput on

declare
  procedure exec_ignore_fail(p_sql in varchar2)
  as
  begin
    execute immediate p_sql;
  exception
  when others then
    dbms_output.put_line('Error executing: ' || p_sql);
    dbms_output.put_line('Error message: ' || SQLERRM);
  end;
begin
  if upper('&skip_deinstall') = 'S' then
    null;
  else
    exec_ignore_fail('begin pk_stats_concurrent.teardown_aq; end;');
    exec_ignore_fail('drop table stats_concurrent_log');
    exec_ignore_fail('drop type stats_concurrent_info force');
    exec_ignore_fail('drop package pk_stats_concurrent');
    exec_ignore_fail('begin dbms_scheduler.drop_job_class(''CONC_STATS''); end;');
  end if;
end;
/

prompt Concurrent Stats - Installation
prompt *----------------------------------------------------*
prompt This script will now attempt to create the objects
prompt belonging to this installation
PAUSE  Hit CTRL+C to cancel, ENTER to continue...

/**
  * The log table for minimum logging of the concurrent execution threads
  * Since we cannot access the DBMS_OUTPUT of these separate processes
  * This needs to be cleaned up manually if / when required
  **/
create table stats_concurrent_log (log_timestamp timestamp, sql_statement clob, message clob);

/**
  * The single object type used as payload in the AQ queue for concurrent execution
  * Each message will have a description of the index plus the actual DDL text as payload
  **/
create or replace type stats_concurrent_info as object
(
  ownname                        varchar2(30)
, tabname                        varchar2(30)
, partname                       varchar2(30)
, degree                         number
, granularity                    varchar2(30)
);
/

show errors

create or replace package pk_stats_concurrent authid current_user
as

  ------------------------------------------------------------------------------
  -- $Id$
  ------------------------------------------------------------------------------

  /**
   * PK_STATS_CONCURRENT.SQL
   *
   * Created By    : Randolf Geist (http://oracle-randolf.blogspot.com)
   * Creation Date : 31-OCT-2016
   * Last Update   : 06-DEC-2016
   * Authors       : Randolf Geist (RG)
   *
   * History       :
   *
   * When        | Who | What
   * ----------------------------------
   * 31-OCT-2016 | RG  | Created
   * 06-DEC-2016 | RG  | This header comment updated
   *
   * Description   :
   *
   * This is a simple prototype implementation for the given task of gathering database stats
   * concurrently, in case you are not satisfied with the built-in concurrent stats option available since 11.2.0.x
   *
   * In 11.2, the CONCURRENT stats option creates as many jobs as there are objects to gather
   * And the JOB_QUEUE_PROCESSES parameter then controls the number of concurrent jobs running
   * Since the ordering of execution isn't really optimized, many of these concurrent jobs might attempt to gather stats on the same object in case it is (sub)partitioned
   * This can lead to significant contention on Library Cache level (due to exclusive Library Cache Locks required by DDL / DBMS_STATS)
   *
   * In 12.1 the CONCURRENT stats option was obviously completed rewritten and uses some more intelligent processing
   * by calculating if and yes how many jobs should run concurrently for what kind of objects (see for example the new DBA_OPTSTAT_OPERATION_TASKS view that exposes some of these details)
   * Still I've observed many occasions with this new implementation where lots of objects were deliberately gathered in the main session
   * one after the other which doesn't really make good use of available resources in case many objects need to be analyzed
   *
   * This implementation tries to work around these points by using a simple queue-based approach for true concurrent stats processing
   * combined with an attempt to distribute the tables to analyze across the different threads in a way that minimizes the contention on Library Cache level
   *
   * It needs to be installed / executed under a suitable account that has the privileges to create queues, types, packages, tables, jobs and job classes and gather stats on the whole database
   *
   * A sample user profile could look like this:

   create user conc_stats identified by conc_stats;

   grant create session, create table, create procedure, create type, create job, manage scheduler, analyze any, analyze any dictionary to conc_stats;

   grant execute on sys.dbms_aq to conc_stats;

   grant execute on sys.dbms_aqadm to conc_stats;

   grant select on sys.v_$parameter to conc_stats;

   alter user conc_stats default tablespace users;

   alter user conc_stats quota unlimited on users;

   * Parameters to be checked, depending on concurrency desired:
   *
   * job_queue_processes: Needs to be set high enough to accommodate for the concurrent stats threads spawned. By default this package spawns CPU_COUNT concurrent threads
   * parallel_max_servers: If a stats thread is supposed to use Parallel Execution (degree > 1) for gathering stats you'll need at least threads * degree Parallel Slaves configured
   * services: It's possible to specify a service to have the stats threads only executed on RAC nodes that run that service
   *
   * The jobs are created under the same job class, currently hard coded value CONC_STATS - this makes the handling easier in case you want to stop / drop the jobs submitted manually
   *
   * The job class name can be passed to calls to DBMS_SCHEDULER.DROP_JOB or STOP_JOB - remember that job classes are owned by SYS, so you have to specify SYS.CONC_STATS for the job class name used here
   *
   * The main entry point STATS_CONCURRENT is all you need to call to start concurrent stats gathering on the database
   * similar to GATHER_DATABASE_STATS using one of the options GATHER, GATHER STALE or GATHER AUTO (default) - here you have to use LIST EMPTY, LIST STALE or LIST AUTO (default)
   *
   * The default behaviour when not specifying any parameters is to start as many threads as there are CPUs by using the CPU_COUNT parameter
   * If you want this to be multiplied by the number of instances in a RAC cluster uncomment the CLUSTER_DATABASE_INSTANCES reference below in the code (assuming same CPU_COUNT on all nodes)
   *
   * This also means that the "intra" parallelism per gather_table_stats call will be one in such a case since the intra parallelism is calculated by default as CPU_COUNT / number of threads
   *
   * If you don't want to have that many threads / PX slaves started, specify the number of concurrent threads and an intra-operation DOP explicitly when calling STATS_CONCURRENT
   *
   * If you want to replace the default nightly stats job with this here, the following steps should achieve this:

   BEGIN DBMS_AUTO_TASK_ADMIN.disable(
     client_name => 'auto optimizer stats collection',
     operation   => NULL,
     window_name => NULL);
   END;

   BEGIN DBMS_SCHEDULER.CREATE_JOB(
     job_name => '',
     schedule_name => 'MAINTENANCE_WINDOW_GROUP',
     job_type => 'PLSQL_BLOCK',
     job_action => 'begin pk_stats_concurrent.stats_concurrent(); end;',
     comments => 'auto optimizer stats collection replacement using concurrent stats operations based on AQ'
     enabled => true);
   END;

   * Please ensure the job is submitted under the account it's supposed to be run - using a different account like SYS to submit the job for a different schema
   * seems to cause problems with privileges (insufficient privileges error messages), at least this was reported to me
   *
   * The code at present only processes objects of type TABLE returned by GATHER_DATABASE_STATS but not indexes
   * assuming that these should be covered by the CASCADE functionality
   *
   * Note: This script is a prototype and comes with NO warranty. Use at your own risk and test/adjust in your environment as necessary
   *       before using it in any production-like case
   *
   * @headcom
   **/

  subtype oracle_object      is varchar2(30);

  /**
   * Let the procedure stats_concurrent decide itself which degree to use.
   * At present this means simply to spawn as many child threads as defined by the CPU_COUNT parameter
   **/
  G_AUTO_PARALLEL_DEGREE            constant integer       := null;

  /**
   * The main entry point to gather statistics via parallel threads / AQ
   * @param p_parallel_degree The number of threads to start G_AUTO_PARALLEL_DEGREE means use the CPU_COUNT parameter to determine number of threads automatically
   * @param p_intra_degree The DOP to use per stats operation, by default calculate DOP based on CPU_COUNT and number of threads to run concurrently (Default DOP = CPU_COUNT / number of threads)
   * @param p_service Specify a service if you want the jobs to be assigned to that particular service, default NULL
   * @param p_gather_option What to pass to GATHER_DATABASE_STATS as option, default LIST AUTO
   * @param p_optional_init Optionally a SQL can be passed usually used to initialize the session
            for example forcing a particular parallel degree
   **/
  procedure stats_concurrent(
    p_parallel_degree in integer  default G_AUTO_PARALLEL_DEGREE
  , p_intra_degree    in integer  default null
  , p_service         in varchar2 default null
  , p_gather_option   in varchar2 default 'LIST AUTO'
  , p_optional_init   in varchar2 default null
  );

  /**
   * Setup the AQ infrastructure (Queue tables, Queues)
   **/
  procedure setup_aq;

  /**
   * Teardown the AQ infrastructure (Queue tables, Queues)
   **/
  procedure teardown_aq;

  /**
   * Helper function to populate the AQ queue with data to process
   * @param p_gather_option What to pass to GATHER_DATABASE_STATS as option, default LIST AUTO
   **/
  function list_stale_database_stats (
    p_gather_option    in  varchar2 default 'LIST AUTO'
  )
  return dbms_stats.objecttab pipelined;

  /**
   * Populate the AQ queue with data to process
   * @param p_parallel_degree The number threads to use - will be used for proper data preparation / queueing order
   * @param p_intra_degree The DOP to use per stats operation, by default calculate DOP based on CPU_COUNT and number of threads to run concurrently
   * @param p_gather_option What to pass to GATHER_DATABASE_STATS as option, default LIST AUTO
   **/
  procedure populate_queue(
    p_parallel_degree in integer
  , p_intra_degree    in integer default null
  , p_gather_option   in varchar2 default 'LIST AUTO'
  );

  /**
   * This gets called for every stats thread
   * It pulls the object to gather from the AQ queue
   * @param p_optional_init Optionally a SQL can be passed usually used to initialize the session
            for example forcing a particular parallel degree
   **/
  procedure stats_thread(
    p_optional_init         in varchar2 default null
  );

end pk_stats_concurrent;
/

show errors

create or replace package body pk_stats_concurrent
as
  ------------------------------------------------------------------------------
  -- $Id$
  ------------------------------------------------------------------------------

  /**
   * PK_STATS_CONCURRENT.SQL
   *
   * Created By    : Randolf Geist (http://oracle-randolf.blogspot.com)
   * Creation Date : 31-OCT-2016
   * Last Update   : 06-DEC-2016
   * Authors       : Randolf Geist (RG)
   *
   * History       :
   *
   * When        | Who | What
   * ----------------------------------
   * 31-OCT-2016 | RG  | Created
   * 06-DEC-2016 | RG  | This header comment updated
   *
   * Description   :
   *
   * This is a simple prototype implementation for the given task of gathering database stats
   * concurrently, in case you are not satisfied with the built-in concurrent stats option available since 11.2.0.x
   *
   * In 11.2, the CONCURRENT stats option creates as many jobs as there are objects to gather
   * And the JOB_QUEUE_PROCESSES parameter then controls the number of concurrent jobs running
   * Since the ordering of execution isn't really optimized, many of these concurrent jobs might attempt to gather stats on the same object in case it is (sub)partitioned
   * This can lead to significant contention on Library Cache level (due to exclusive Library Cache Locks required by DDL / DBMS_STATS)
   *
   * In 12.1 the CONCURRENT stats option was obviously completed rewritten and uses some more intelligent processing
   * by calculating if and yes how many jobs should run concurrently for what kind of objects (see for example the new DBA_OPTSTAT_OPERATION_TASKS view that exposes some of these details)
   * Still I've observed many occasions with this new implementation where lots of objects were deliberately gathered in the main session
   * one after the other which doesn't really make good use of available resources in case many objects need to be analyzed
   *
   * This implementation tries to work around these points by using a simple queue-based approach for true concurrent stats processing
   * combined with an attempt to distribute the tables to analyze across the different threads in a way that minimizes the contention on Library Cache level
   *
   * It needs to be installed / executed under a suitable account that has the privileges to create queues, types, packages, tables, jobs and job classes and gather stats on the whole database
   *
   * A sample user profile could look like this:

   create user conc_stats identified by conc_stats;

   grant create session, create table, create procedure, create type, create job, manage scheduler, analyze any, analyze any dictionary to conc_stats;

   grant execute on sys.dbms_aq to conc_stats;

   grant execute on sys.dbms_aqadm to conc_stats;

   grant select on sys.v_$parameter to conc_stats;

   alter user conc_stats default tablespace users;

   alter user conc_stats quota unlimited on users;

   * Parameters to be checked, depending on concurrency desired:
   *
   * job_queue_processes: Needs to be set high enough to accommodate for the concurrent stats threads spawned. By default this package spawns CPU_COUNT concurrent threads
   * parallel_max_servers: If a stats thread is supposed to use Parallel Execution (degree > 1) for gathering stats you'll need at least threads * degree Parallel Slaves configured
   * services: It's possible to specify a service to have the stats threads only executed on RAC nodes that run that service
   *
   * The jobs are created under the same job class, currently hard coded value CONC_STATS - this makes the handling easier in case you want to stop / drop the jobs submitted manually
   *
   * The job class name can be passed to calls to DBMS_SCHEDULER.DROP_JOB or STOP_JOB - remember that job classes are owned by SYS, so you have to specify SYS.CONC_STATS for the job class name used here
   *
   * The main entry point STATS_CONCURRENT is all you need to call to start concurrent stats gathering on the database
   * similar to GATHER_DATABASE_STATS using one of the options GATHER, GATHER STALE or GATHER AUTO (default) - here you have to use LIST EMPTY, LIST STALE or LIST AUTO (default)
   *
   * The default behaviour when not specifying any parameters is to start as many threads as there are CPUs by using the CPU_COUNT parameter
   * If you want this to be multiplied by the number of instances in a RAC cluster uncomment the CLUSTER_DATABASE_INSTANCES reference below in the code (assuming same CPU_COUNT on all nodes)
   *
   * This also means that the "intra" parallelism per gather_table_stats call will be one in such a case since the intra parallelism is calculated by default as CPU_COUNT / number of threads
   *
   * If you don't want to have that many threads / PX slaves started, specify the number of concurrent threads and an intra-operation DOP explicitly when calling STATS_CONCURRENT
   *
   * If you want to replace the default nightly stats job with this here, the following steps should achieve this:

   BEGIN DBMS_AUTO_TASK_ADMIN.disable(
     client_name => 'auto optimizer stats collection',
     operation   => NULL,
     window_name => NULL);
   END;

   BEGIN DBMS_SCHEDULER.CREATE_JOB(
     job_name => '',
     schedule_name => 'MAINTENANCE_WINDOW_GROUP',
     job_type => 'PLSQL_BLOCK',
     job_action => 'begin pk_stats_concurrent.stats_concurrent(); end;',
     comments => 'auto optimizer stats collection replacement using concurrent stats operations based on AQ'
     enabled => true);
   END;

   * Please ensure the job is submitted under the account it's supposed to be run - using a different account like SYS to submit the job for a different schema
   * seems to cause problems with privileges (insufficient privileges error messages), at least this was reported to me
   *
   * The code at present only processes objects of type TABLE returned by GATHER_DATABASE_STATS but not indexes
   * assuming that these should be covered by the CASCADE functionality
   *
   * Note: This script is a prototype and comes with NO warranty. Use at your own risk and test/adjust in your environment as necessary
   *       before using it in any production-like case
   *
   * @headcom
   **/

  -- The queue name to use for AQ operations
  G_QUEUE_NAME               constant varchar2(24) := 'STATS_QUEUE';

  /**
   * Rudimentary logging required by the parallel threads since the
   * serveroutput generated can not be accessed
   * @param p_sql The SQL to log that raised the error
   * @param p_error_msg The error message to log
   **/
  procedure log(
    p_sql in clob
  , p_msg in clob
  )
  as
    -- We do this in an autonomous transaction since we want the logging
    -- to be visible while any other main transactions might be still going on
    pragma autonomous_transaction;
  begin
    insert into stats_concurrent_log(
      log_timestamp
    , sql_statement
    , message
    ) values (
      systimestamp
    , p_sql
    , p_msg
    );
    commit;
  end log;

  /**
   * Execute a SQL statement potentially in a different schema (dummy implementation here).
   * The string will be put to serveroutput before being executed
   * @param p_owner The schema to execute
   * @param p_sql The SQL to execute
   * @param p_log_error Should an error be logged or not. Default is true
   **/
  procedure execute(
    p_owner     in oracle_object
  , p_sql       in clob
  , p_log_error in boolean default true
  )
  as
  begin
    -- dbms_output.put_line('Owner: ' || p_owner || ' SQL: ' || substr(p_sql, 1, 4000));
  $if dbms_db_version.ver_le_10 $then
    declare
      a_sql           dbms_sql.varchar2a;
      n_start_line    number;
      n_end_line      number;
      c               integer;
      n               integer;
      LF constant     varchar2(10) := '
';
      len_LF constant integer := length(LF);
    begin
      n_start_line := 1 - len_LF;
      loop
        n_end_line := instr(p_sql, LF, n_start_line + len_LF);
        a_sql(a_sql.count + 1) := substr(p_sql, n_start_line + len_LF, case when n_end_line = 0 then length(p_sql) else n_end_line end - (n_start_line + len_LF) + len_LF);
        -- dbms_output.put_line(a_sql.count || ':' || a_sql(a_sql.count));
        exit when n_end_line = 0;
        n_start_line := n_end_line;
      end loop;
      c := dbms_sql.open_cursor;
      dbms_sql.parse(c, a_sql, 1, a_sql.count, false, dbms_sql.NATIVE);
      n := dbms_sql.execute(c);
      dbms_sql.close_cursor(c);
    end;
  $elsif dbms_db_version.ver_le_11 $then
    execute immediate p_sql;
  $else
    execute immediate p_sql;
  $end
  exception
  when others then
    dbms_output.put_line('Error: ' || SQLERRM);
    if p_log_error then
      log(p_sql, SQLERRM);
    end if;
    raise;
  end execute;

  /**
   * Execute a SQL statement potentially in a different schema (dummy implementation here).
   * This one uses an autonomous transaction.
   * The string will be put to serveroutput before being executed
   * @param p_owner The schema to execute
   * @param p_sql The SQL to execute
   * @param p_log_error Should an error be logged or not. Default is true
   **/
  procedure execute_autonomous(
    p_owner     in oracle_object
  , p_sql       in clob
  , p_log_error in boolean default true
  )
  as
    pragma autonomous_transaction;
  begin
    execute(p_owner, p_sql, p_log_error);
  end execute_autonomous;

  /**
   * Setup the AQ infrastructure (Queue tables, Queues)
   **/
  procedure setup_aq
  as
  begin
    begin
      execute(
        null
      , 'begin dbms_aqadm.create_queue_table(
           queue_table => ''' || G_QUEUE_NAME || '''
         , queue_payload_type => ''stats_concurrent_info''
         ); end;'
      );
    exception
    when others then
      dbms_output.put_line('Error creating Queue table: ' || SQLERRM);
      raise;
    end;

    begin
      execute(
        null
      , 'begin dbms_aqadm.create_queue(
           queue_name => ''' || G_QUEUE_NAME || '''
         , queue_table => ''' || G_QUEUE_NAME || '''
         ); end;'
      );
    exception
    when others then
      dbms_output.put_line('Error creating Queue: ' || SQLERRM);
      raise;
    end;

    begin
      execute(
        null
      , 'begin dbms_aqadm.start_queue(
           queue_name => ''' || G_QUEUE_NAME || '''
         ); end;'
      );
    exception
    when others then
      dbms_output.put_line('Error starting Queue: ' || SQLERRM);
      raise;
    end;
  end setup_aq;

  /**
   * Teardown the AQ infrastructure (Queue tables, Queues)
   **/
  procedure teardown_aq
  as
  begin
    begin
      execute(
        null
      , 'begin dbms_aqadm.stop_queue(
           queue_name => ''' || G_QUEUE_NAME || '''
         , wait       => true
         ); end;'
      , false
      );
    exception
    when others then
      dbms_output.put_line('Error stopping Queue: ' || SQLERRM);
      -- raise;
    end;

    begin
      execute(
        null
      , 'begin dbms_aqadm.drop_queue(
           queue_name => ''' || G_QUEUE_NAME || '''
         ); end;'
      , false
      );
    exception
    when others then
      dbms_output.put_line('Error dropping Queue: ' || SQLERRM);
      -- raise;
    end;

    begin
      execute(
        null
      , 'begin dbms_aqadm.drop_queue_table(
           queue_table => ''' || G_QUEUE_NAME || '''
         , force => true
         ); end;'
      , false
      );
    exception
    when others then
      dbms_output.put_line('Error dropping Queue table: ' || SQLERRM);
      -- raise;
    end;

  end teardown_aq;

  /**
   * Helper function to populate the AQ queue with data to process
   * @param p_gather_option What to pass to GATHER_DATABASE_STATS as option, default LIST AUTO
   **/
  function list_stale_database_stats (
    p_gather_option    in  varchar2 default 'LIST AUTO'
  )
  return dbms_stats.objecttab pipelined
  as
    pragma autonomous_transaction;
    m_object_list   dbms_stats.objecttab;
  begin
    if p_gather_option not in (
        'LIST AUTO', 'LIST STALE','LIST EMPTY'
    ) then
        null;
      else
        dbms_stats.gather_database_stats(
            options     => p_gather_option,
            objlist     => m_object_list
        );
        for i in 1..m_object_list.count loop
          pipe row (m_object_list(i));
        end loop;
    end if;
    return;
  end list_stale_database_stats;

  /**
   * Populate the AQ queue with data to process
   * @param p_parallel_degree The number threads to use - will be used for proper data preparation / queueing order
   * @param p_intra_degree The DOP to use per stats operation, by default calculate DOP based on CPU_COUNT and number of threads to run concurrently
   * @param p_gather_option What to pass to GATHER_DATABASE_STATS as option, default LIST AUTO
   **/
  procedure populate_queue(
    p_parallel_degree in integer
  , p_intra_degree    in integer default null
  , p_gather_option   in varchar2 default 'LIST AUTO'
  )
  as
    enq_msgid     raw(16);
    payload       stats_concurrent_info := stats_concurrent_info(null, null, null, null, null);
    n_dop         integer;
  begin
    -- By default determine what intra-operation DOP to use depending on how many concurrent stats threads are supposed to run
    select nvl(p_intra_degree, ceil((select to_number(value) from v$parameter where name = 'cpu_count') / p_parallel_degree)) as dop
    into n_dop
    from dual;
    -- Populate the queue and use some "intelligent" ordering attempting to minimize (library cache) contention on the objects
    for rec in (
      with
      -- The baseline, all TABLE objects returned by GATHER_DATABASE_STATS LIST* call
      a as (
      select /*+ materialize */ rownum as rn, a.* from table(pk_stats_concurrent.list_stale_database_stats(p_gather_option)) a where objtype = 'TABLE'
      ),
      -- Assign all table, partitions and subpartitions to p_parallel_degree buckets
      concurrent_stats as (
      select ntile(p_parallel_degree) over (order by rn) as new_order, a.* from a where partname is null
      union all
      select ntile(p_parallel_degree) over (order by rn) as new_order, a.* from a where partname is not null and subpartname is null
      union all
      select ntile(p_parallel_degree) over (order by rn) as new_order, a.* from a where partname is not null and subpartname is not null
      ),
      -- Now assign a row number within each bucket
      b as (
      select c.*, row_number() over (partition by new_order order by rn) as new_rn from concurrent_stats c
      )
      -- And pick one from each bucket in turn for queuing order
      select
              ownname
            , objname as tabname
            , coalesce(subpartname, partname) as partname
            , n_dop as degree
            , case when partname is null then 'GLOBAL' when partname is not null and subpartname is null then 'PARTITION' else 'SUBPARTITION' end as granularity
      from
              b
      order by
              new_rn, new_order
    ) loop
      payload.ownname     := rec.ownname;
      payload.tabname     := rec.tabname;
      payload.partname    := rec.partname;
      payload.degree      := rec.degree;
      payload.granularity := rec.granularity;
      -- TODO: Enqueue via array using ENQUEUE_ARRAY
      execute immediate '
      declare
        eopt          dbms_aq.enqueue_options_t;
        mprop         dbms_aq.message_properties_t;
      begin
        dbms_aq.enqueue(
            queue_name => ''' || G_QUEUE_NAME || ''',
            enqueue_options => eopt,
            message_properties => mprop,
            payload => :payload,
            msgid => :enq_msgid);
      end;'
      using payload, out enq_msgid;
    end loop;
    commit;
  end populate_queue;

  /**
   * This gets called for every stats thread
   * It pulls the object to gather from the AQ queue
   * @param p_optional_init Optionally a SQL can be passed usually used to initialize the session
            for example forcing a particular parallel degree
   **/
  procedure stats_thread(
    p_optional_init         in varchar2 default null
  )
  as
    deq_msgid             RAW(16);
    payload               stats_concurrent_info;
    no_messages           exception;
    pragma exception_init(no_messages, -25228);
    s_sql                 clob;
  begin
    if p_optional_init is not null then
      execute(null, p_optional_init);
    end if;

    -- If the VISIBILITY is set to IMMEDIATE
    -- it will cause the "queue transaction" to be committed
    -- Which means that the STOP_QUEUE call with the WAIT option will
    -- be able to stop the queue while the processing takes place
    -- and the queue table can be monitored for progress
    loop
      begin
        execute immediate '
        declare
          dopt                  dbms_aq.dequeue_options_t;
          mprop                 dbms_aq.message_properties_t;
        begin
          dopt.visibility := dbms_aq.IMMEDIATE;
          dopt.wait := dbms_aq.NO_WAIT;
          dbms_aq.dequeue(
              queue_name => ''' || G_QUEUE_NAME || ''',
              dequeue_options => dopt,
              message_properties => mprop,
              payload => :payload,
              msgid => :deq_msgid);
        end;'
        using out payload, out deq_msgid;
        s_sql := '
begin
  dbms_stats.gather_table_stats(
    ownname     => ''' || payload.ownname     || '''
  , tabname     => ''' || payload.tabname     || '''
  , partname    => ''' || payload.partname    || '''
  , degree      => ' || payload.degree      || '
  , granularity => ''' || payload.granularity || '''
  );
end;
';
        -- Execute the command
        log(s_sql, 'Ownname: ' || payload.ownname || ' Tabname: ' || payload.tabname || ' Partname: ' || payload.partname || ' Degree: ' || payload.degree || ' Granularity: ' || payload.granularity);
        begin
          execute_autonomous(payload.ownname, s_sql);
        exception
        /*
        when object_already_exists then
          null;
        when object_does_not_exist then
          null;
        */
        when others then
          null;
        end;
      exception
      when no_messages then
        exit;
      end;
    end loop;
    commit;
  end stats_thread;

  /**
   * The main entry point to gather statistics via parallel threads / AQ
   * @param p_parallel_degree The number of threads to start G_AUTO_PARALLEL_DEGREE means use the CPU_COUNT (but not
            CLUSTER_DATABASE_INSTANCES parameter, commented out below) to determine number of threads automatically
   * @param p_intra_degree The DOP to use per stats operation, by default calculate DOP based on CPU_COUNT and number of threads to run concurrently
   * @param p_service Specify a service if you want the jobs to be assigned to that particular service, default NULL
   * @param p_gather_option What to pass to GATHER_DATABASE_STATS as option, default LIST AUTO
   * @param p_optional_init Optionally a SQL can be passed usually used to initialize the session
            for example forcing a particular parallel degree
   **/
  procedure stats_concurrent(
    p_parallel_degree in integer  default G_AUTO_PARALLEL_DEGREE
  , p_intra_degree    in integer  default null
  , p_service         in varchar2 default null
  , p_gather_option   in varchar2 default 'LIST AUTO'
  , p_optional_init   in varchar2 default null
  )
  as
    n_cpu_count          binary_integer;
    n_instance_count     binary_integer;
    n_thread_count       binary_integer;
    strval               varchar2(256);
    partyp               binary_integer;
    e_job_class_exists exception;
    pragma exception_init(e_job_class_exists, -27477);
    s_job_class constant varchar2(30) := 'CONC_STATS';
  begin
    -- Truncate the log table
    execute immediate 'truncate table stats_concurrent_log';
    -- Just in case something has been left over from a previous run
    teardown_aq;
    setup_aq;
    -- Populate the queue
    populate_queue(p_parallel_degree, p_intra_degree, p_gather_option);
    -- Determine auto degree of parallelism
    partyp := dbms_utility.get_parameter_value('cpu_count', n_cpu_count, strval);
    partyp := dbms_utility.get_parameter_value('cluster_database_instances', n_instance_count, strval);
    n_thread_count := nvl(p_parallel_degree, n_cpu_count/* * n_instance_count*/);
    -- Create/use a common job class, makes job handling easier and allows binding to a specific service
    begin
      dbms_scheduler.create_job_class(s_job_class);
    exception
    when e_job_class_exists then
      null;
    end;
    -- Assign jobs to a particular service if requested
    if p_service is null then
      dbms_scheduler.set_attribute_null('SYS.' || s_job_class, 'SERVICE');
    else
      dbms_scheduler.set_attribute('SYS.' || s_job_class, 'SERVICE', p_service);
    end if;
    -- Submit the jobs
    for i in 1..n_thread_count loop
      dbms_scheduler.create_job(
        job_name   => s_job_class || '_' || i
      , job_type   => 'PLSQL_BLOCK'
      , job_class  => s_job_class
      , enabled    => true
      , job_action => 'begin dbms_session.set_role(''ALL''); pk_stats_concurrent.stats_thread(''' || p_optional_init || '''); end;'
      );
    end loop;
    -- Just in case anyone wants to use DBMS_JOB instead we need to commit the DBMS_JOB.SUBMIT
    commit;
    --execute immediate 'begin dbms_lock.sleep(:p_sleep_seconds); end;' using p_sleep_seconds;
    --teardown_aq;
  end stats_concurrent;
end pk_stats_concurrent;
/

show errors

spool off

No comments:

Post a Comment

Note: Only a member of this blog may post a comment.