This is the third part of the video tutorial "Analysing Parallel
Execution Skew". In this part I show how to analyse a parallel SQL execution regarding Parallel Execution Skew.
If you don't have a Diagnostics / Tuning Pack license the options you have for doing that are quite limited, and the approach, as demonstrated in the tutorial, has several limitations and shortcomings.
Here is a
link to the video on my Youtube channel.
If you want to reproduce or play around with the examples shown in the tutorial here is the script for creating the tables and
running the queries / DML commands used in the tutorial. A shout goes out to Christo Kutrovsky at Pythian who I think was the one who inspired the beautified version on V$PQ_TQSTAT.
---------------------
-- Links for S-ASH --
---------------------
--
-- http://www.perfvision.com/ash.php
-- http://www.pythian.com/blog/trying-out-s-ash/
-- http://sourceforge.net/projects/orasash/files/v2.3/
-- http://sourceforge.net/projects/ashv/
---------------------
-- Table creation
set echo on timing on time on
drop table t_1;
purge table t_1;
drop table t_2;
purge table t_2;
drop table t_1_part;
purge table t_1_part;
drop table t_2_part;
purge table t_2_part;
drop table t1;
purge table t1;
drop table t2;
purge table t2;
drop table t3;
purge table t3;
drop table t4;
purge table t4;
drop table t5;
purge table t5;
drop table x;
purge table x;
create table t1
as
select /*+ use_nl(a b) */
(rownum * 2) as id
, rownum as id2
, rpad('x', 100) as filler
from
(select /*+ cardinality(1000) */ * from dual
connect by
level <= 1000) a, (select /*+ cardinality(2) */ * from dual connect by level <= 2) b
;
exec dbms_stats.gather_table_stats(null, 't1')
alter table t1 cache;
create table t2
compress
as
select
(rownum * 2) + 1 as id
, mod(rownum, 2000) + 1 as id2
, rpad('x', 100) as filler
from
(select /*+ cardinality(1000000) */ * from dual
connect by
level <= 1000000) a, (select /*+ cardinality(2) */ * from dual connect by level <= 2) b
;
exec dbms_stats.gather_table_stats(null, 't2')
alter table t2 cache;
create table t3
as
select /*+ use_nl(a b) */
(rownum * 2) as id
, rownum as id2
, rpad('x', 100) as filler
from
(select /*+ cardinality(1000) */ * from dual
connect by
level <= 1000) a, (select /*+ cardinality(2) */ * from dual connect by level <= 2) b
;
exec dbms_stats.gather_table_stats(null, 't3')
alter table t3 cache;
create table t4
compress
as
select
(rownum * 2) + 1 as id
, mod(rownum, 2000) + 1 as id2
, rpad('x', 100) as filler
from
(select /*+ cardinality(1000000) */ * from dual
connect by
level <= 1000000) a, (select /*+ cardinality(2) */ * from dual connect by level <= 2) b
;
exec dbms_stats.gather_table_stats(null, 't4')
alter table t4 cache;
create table t5
as
select /*+ use_nl(a b) */
(rownum * 2) as id
, rownum as id2
, rpad('x', 100) as filler
from
(select /*+ cardinality(1000) */ * from dual
connect by
level <= 1000) a, (select /*+ cardinality(2) */ * from dual connect by level <= 2) b
;
exec dbms_stats.gather_table_stats(null, 't5')
alter table t5 cache;
create table x
compress
as
select * from t2
where 1 = 2;
create unique index x_idx1 on x (id);
alter table t1 parallel 2;
alter table t2 parallel 2;
alter table t3 parallel 15;
alter table t4 parallel 15;
alter table t5 parallel 15;
create table t_1
compress
as
select /*+ use_nl(a b) */
rownum as id
, rpad('x', 100) as filler
from
(select /*+ cardinality(1e5) */ * from dual
connect by
level <= 1e5) a, (select /*+ cardinality(20) */ * from dual connect by level <= 20) b
;
exec dbms_stats.gather_table_stats(null, 't_1')
create table t_2
compress
as
select
rownum as id
, case when rownum <= 5e5 then mod(rownum, 2e6) + 1 else 1 end as fk_id_skew
, rpad('x', 100) as filler
from
(select /*+ cardinality(1e5) */ * from dual
connect by
level <= 1e5) a, (select /*+ cardinality(20) */ * from dual connect by level <= 20) b
;
exec dbms_stats.gather_table_stats(null, 't_2', method_opt=>'for all columns size 1', no_invalidate=>false)
alter table t_1 parallel 8 cache;
alter table t_2 parallel 8 cache;
create table t_1_part
partition by hash(id) partitions 8
compress
as
select /*+ use_nl(a b) */
rownum as id
, rpad('x', 100) as filler
from
(select /*+ cardinality(1e5) */ * from dual
connect by
level <= 1e5) a, (select /*+ cardinality(20) */ * from dual connect by level <= 20) b
;
exec dbms_stats.gather_table_stats(null, 't_1_part')
create table t_2_part
partition by hash(fk_id_skew) partitions 8
compress
as
select
rownum as id
, case when rownum <= 5e5 then mod(rownum, 2e6) + 1 else 1 end as fk_id_skew
, rpad('x', 100) as filler
from
(select /*+ cardinality(1e5) */ * from dual
connect by
level <= 1e5) a, (select /*+ cardinality(20) */ * from dual connect by level <= 20) b
;
exec dbms_stats.gather_table_stats(null, 't_2_part', method_opt=>'for all columns size 1', no_invalidate=>false)
alter table t_1_part parallel 8 cache;
alter table t_2_part parallel 8 cache;
---------------------------------------------------------------
-- Single DFO tree (with Parallel Execution Skew), many DFOs --
---------------------------------------------------------------
set echo on timing on time on verify on
define num_cpu = "14"
alter session set workarea_size_policy = manual;
alter session set sort_area_size = 200000000;
alter session set sort_area_size = 200000000;
alter session set hash_area_size = 200000000;
alter session set hash_area_size = 200000000;
select
max(t1_id)
, max(t1_filler)
, max(t2_id)
, max(t2_filler)
, max(t3_id)
, max(t3_filler)
from (
select /*+ monitor
no_merge
no_merge(v_1)
no_merge(v_5)
parallel(t1 &num_cpu)
PQ_DISTRIBUTE(T1 HASH HASH)
PQ_DISTRIBUTE(V_5 HASH HASH)
leading (v_1 v_5 t1)
use_hash(v_1 v_5 t1)
swap_join_inputs(t1)
*/
t1.id as t1_id
, regexp_replace(v_5.t3_filler, '^\s+([[:alnum:]]+)\s+$', lpad('\1', 10), 1, 1, 'c') as t1_filler
, v_5.*
from (
select /*+ parallel(t2 &num_cpu)
parallel(t3 &num_cpu)
leading(t3 t2)
use_hash(t3 t2)
swap_join_inputs(t2)
PQ_DISTRIBUTE(T2 HASH HASH)
*/
t2.id as t2_id
, t2.filler as t2_filler
, t2.id2 as t2_id2
, t3.id as t3_id
, t3.filler as t3_filler
from
t1 t2
, t2 t3
where
t3.id2 = t2.id2 (+)
and regexp_replace(t3.filler, '^([[:alnum:]]+)\s+$', lpad('\1', 10), 1, 1, 'c') = regexp_replace(t2.filler (+), '^([[:alnum:]]+)\s+$', lpad('\1', 10), 1, 1, 'c')
and mod(t3.id2, 3) = 0
) v_1
, (
select /*+ parallel(t2 &num_cpu)
parallel(t3 &num_cpu)
leading(t3 t2)
use_hash(t3 t2)
swap_join_inputs(t2)
PQ_DISTRIBUTE(T2 HASH HASH)
*/
t2.id as t2_id
, t2.filler as t2_filler
, t2.id2 as t2_id2
, t3.id as t3_id
, t3.filler as t3_filler
from
t1 t2
, t2 t3
where
t3.id = t2.id (+)
and regexp_replace(t3.filler, '^([[:alnum:]]+)\s+$', lpad('\1', 10), 1, 1, 'c') = regexp_replace(t2.filler (+), '^([[:alnum:]]+)\s+$', lpad('\1', 10), 1, 1, 'c')
and mod(t3.id2, 3) = 0
) v_5
, t1
where
v_1.t3_id = v_5.t3_id
and v_5.t2_id2 = t1.id2 (+) + 2001
and regexp_replace(v_5.t3_filler, '^([[:alnum:]]+)\s+$', lpad('\1', 10), 1, 1, 'c') = regexp_replace(t1.filler (+), '^([[:alnum:]]+)\s+$', lpad('\1', 10), 1, 1, 'c')
)
;
break on dfo_number nodup on tq_id nodup on server_type skip 1 nodup on instance nodup
-- compute sum label Total of num_rows on server_type
select
/*dfo_number
, */tq_id
, cast(server_type as varchar2(10)) as server_type
, instance
, cast(process as varchar2(8)) as process
, num_rows
, round(ratio_to_report(num_rows) over (partition by dfo_number, tq_id, server_type) * 100) as "%"
, cast(rpad('#', round(num_rows * 10 / nullif(max(num_rows) over (partition by dfo_number, tq_id, server_type), 0)), '#') as varchar2(10)) as graph
, round(bytes / 1024 / 1024) as mb
, round(bytes / nullif(num_rows, 0)) as "bytes/row"
from
v$pq_tqstat
order by
dfo_number
, tq_id
, server_type desc
, instance
, process
;
---------------------------------------------------------------------------------------------------
-- Same statement with Parallel TEMP TABLE TRANSFORMATION, V$PQ_TQSTAT shows useless information --
---------------------------------------------------------------------------------------------------
set echo on timing on time on verify on
define num_cpu = "14"
alter session set workarea_size_policy = manual;
alter session set sort_area_size = 200000000;
alter session set sort_area_size = 200000000;
alter session set hash_area_size = 200000000;
alter session set hash_area_size = 200000000;
with result as
(
select /*+ materialize
monitor
no_merge
no_merge(v_1)
no_merge(v_5)
parallel(t1 &num_cpu)
PQ_DISTRIBUTE(T1 HASH HASH)
PQ_DISTRIBUTE(V_1 HASH HASH)
PQ_DISTRIBUTE(V_5 HASH HASH)
leading (v_1 v_5 t1)
use_hash(v_1 v_5 t1)
swap_join_inputs(t1)
*/
t1.id as t1_id
, regexp_replace(v_5.t3_filler, '^\s+([[:alnum:]]+)\s+$', lpad('\1', 10), 1, 1, 'c') as t1_filler
, v_5.*
from (
select /*+ parallel(t2 &num_cpu) parallel(t3 &num_cpu) leading(t3 t2) use_hash(t3 t2) swap_join_inputs(t2) PQ_DISTRIBUTE(T2 HASH HASH) */
t2.id as t2_id
, t2.filler as t2_filler
, t2.id2 as t2_id2
, t3.id as t3_id
, t3.filler as t3_filler
from
t1 t2
, t2 t3
where
t3.id2 = t2.id2 (+)
and regexp_replace(t3.filler, '^([[:alnum:]]+)\s+$', lpad('\1', 10), 1, 1, 'c') = regexp_replace(t2.filler (+), '^([[:alnum:]]+)\s+$', lpad('\1', 10), 1, 1, 'c')
and mod(t3.id2, 3) = 0
)
v_1
, (
select /*+ parallel(t2 &num_cpu) parallel(t3 &num_cpu) leading(t3 t2) use_hash(t3 t2) swap_join_inputs(t2) PQ_DISTRIBUTE(T2 HASH HASH) */
t2.id as t2_id
, t2.filler as t2_filler
, t2.id2 as t2_id2
, t3.id as t3_id
, t3.filler as t3_filler
from
t1 t2
, t2 t3
where
t3.id = t2.id (+)
and regexp_replace(t3.filler, '^([[:alnum:]]+)\s+$', lpad('\1', 10), 1, 1, 'c') = regexp_replace(t2.filler (+), '^([[:alnum:]]+)\s+$', lpad('\1', 10), 1, 1, 'c')
and mod(t3.id2, 3) = 0
) v_5
, t1
where
v_1.t3_id = v_5.t3_id
and v_5.t2_id2 = t1.id2 (+) + 2001
and regexp_replace(v_5.t3_filler, '^([[:alnum:]]+)\s+$', lpad('\1', 10), 1, 1, 'c') = regexp_replace(t1.filler (+), '^([[:alnum:]]+)\s+$', lpad('\1', 10), 1, 1, 'c')
)
select max(t1_id), max(t1_filler), max(t2_id), max(t2_filler), max(t3_id), max(t3_filler) from
result;
break on dfo_number nodup on tq_id nodup on server_type skip 1 nodup on instance nodup
-- compute sum label Total of num_rows on server_type
select
/*dfo_number
, */tq_id
, cast(server_type as varchar2(10)) as server_type
, instance
, cast(process as varchar2(8)) as process
, num_rows
, round(ratio_to_report(num_rows) over (partition by dfo_number, tq_id, server_type) * 100) as "%"
, cast(rpad('#', round(num_rows * 10 / nullif(max(num_rows) over (partition by dfo_number, tq_id, server_type), 0)), '#') as varchar2(10)) as graph
, round(bytes / 1024 / 1024) as mb
, round(bytes / nullif(num_rows, 0)) as "bytes/row"
from
v$pq_tqstat
order by
dfo_number
, tq_id
, server_type desc
, instance
, process
;
--------------------------------------------------------------------------------------------------
-- This construct results in misleading information from V$PQ_TQSTAT (actually a complete mess) --
--------------------------------------------------------------------------------------------------
set echo on timing on time on
alter session enable parallel dml;
truncate table x;
insert /*+ append parallel(x 4) */ into x
select /*+ leading(v1 v2) optimizer_features_enable('11.2.0.1') */
v_1.id
, v_1.id2
, v_1.filler
from (
select
id
, id2
, filler
from (
select /*+ parallel(t2 4) no_merge */
rownum as id
, t2.id2
, t2.filler
from
t2
where
mod(t2.id2, 3) = 0
and regexp_replace(t2.filler, '^([[:alnum:]]+)\s+$', lpad('\1', 10), 1, 1, 'c') = regexp_replace(t2.filler, '^([[:alnum:]]+)\s+$', lpad('\1', 10), 1, 1, 'i')
) v1
) v_1
, (
select
id
, id2
, filler
from (
select /*+ parallel(t2 8) no_merge */
rownum as id
, t2.id2
, t2.filler
from
t2
where
mod(t2.id2, 3) = 0
and regexp_replace(t2.filler, '^([[:alnum:]]+)\s+$', lpad('\1', 10), 1, 1, 'c') = regexp_replace(t2.filler, '^([[:alnum:]]+)\s+$', lpad('\1', 10), 1, 1, 'i')
) v2
) v_2
where
v_1.id = v_2.id
and v_1.filler = v_2.filler
;
-- Parallel DML requires a COMMIT before querying V$PQ_TQSTAT
commit;
break on dfo_number nodup on tq_id nodup on server_type skip 1 nodup on instance nodup
compute sum label Total of num_rows on server_type
select
dfo_number
, tq_id
, cast(server_type as varchar2(10)) as server_type
, instance
, cast(process as varchar2(8)) as process
, num_rows
, round(ratio_to_report(num_rows) over (partition by dfo_number, tq_id, server_type) * 100) as "%"
, cast(rpad('#', round(num_rows * 10 / nullif(max(num_rows) over (partition by dfo_number, tq_id, server_type), 0)), '#') as varchar2(10)) as graph
, round(bytes / 1024 / 1024) as mb
, round(bytes / nullif(num_rows, 0)) as "bytes/row"
from
v$pq_tqstat
order by
dfo_number
, tq_id
, server_type desc
, instance
, process
;
----------------------------------------------------------------------
-- Single DFO tree (with Parallel Execution Skew, almost no impact) --
----------------------------------------------------------------------
set echo on timing on time on
alter session set workarea_size_policy = manual;
alter session set sort_area_size = 500000000;
alter session set sort_area_size = 500000000;
alter session set hash_area_size = 500000000;
alter session set hash_area_size = 500000000;
select /*+ leading(v1)
use_hash(t_1)
no_swap_join_inputs(t_1)
pq_distribute(t_1 hash hash)
*/
max(t_1.filler)
, max(v1.t_1_filler)
, max(v1.t_2_filler)
from
t_1
, (
select /*+ no_merge
leading(t_1 t_2)
use_hash(t_2)
no_swap_join_inputs(t_2)
pq_distribute(t_2 hash hash) */
t_1.id as t_1_id
, t_1.filler as t_1_filler
, t_2.id as t_2_id
, t_2.filler as t_2_filler
from t_1
, t_2
where
t_2.fk_id_skew = t_1.id
) v1
where
v1.t_2_id = t_1.id
and regexp_replace(v1.t_2_filler, '^\s+([[:alnum:]]+)\s+$', lpad('\1', 10), 1, 1, 'c') >= regexp_replace(t_1.filler, '^\s+([[:alnum:]]+)\s+$', lpad('\1', 10), 1, 1, 'c')
and regexp_replace(v1.t_2_filler, '^\s+([[:alnum:]]+)\s+$', lpad('\1', 10), 1, 1, 'i') >= regexp_replace(t_1.filler, '^\s+([[:alnum:]]+)\s+$', lpad('\1', 10), 1, 1, 'i')
;
break on dfo_number nodup on tq_id nodup on server_type skip 1 nodup on instance nodup
-- compute sum label Total of num_rows on server_type
select
/*dfo_number
, */tq_id
, cast(server_type as varchar2(10)) as server_type
, instance
, cast(process as varchar2(8)) as process
, num_rows
, round(ratio_to_report(num_rows) over (partition by dfo_number, tq_id, server_type) * 100) as "%"
, cast(rpad('#', round(num_rows * 10 / nullif(max(num_rows) over (partition by dfo_number, tq_id, server_type), 0)), '#') as varchar2(10)) as graph
, round(bytes / 1024 / 1024) as mb
, round(bytes / nullif(num_rows, 0)) as "bytes/row"
from
v$pq_tqstat
order by
dfo_number
, tq_id
, server_type desc
, instance
, process
;
--------------------------------------------------------------------------------------------------------------------------------
-- Full Partition Wise Join with partition skew - V$PQ_TQSTAT is of no help, since no redistribution takes place (single DFO) --
--------------------------------------------------------------------------------------------------------------------------------
set echo on timing on time on
alter session set workarea_size_policy = manual;
alter session set sort_area_size = 500000000;
alter session set sort_area_size = 500000000;
alter session set hash_area_size = 500000000;
alter session set hash_area_size = 500000000;
select count(t_2_filler) from (
select /*+ monitor
leading(t_1 t_2)
use_hash(t_2)
no_swap_join_inputs(t_2)
pq_distribute(t_2 none none)
*/
t_1.id as t_1_id
, t_1.filler as t_1_filler
, t_2.id as t_2_id
, t_2.filler as t_2_filler
from t_1_part t_1
, t_2_part t_2
where
t_2.fk_id_skew = t_1.id
and regexp_replace(t_2.filler, '^\s+([[:alnum:]]+)\s+$', lpad('\1', 10), 1, 1, 'c') >= regexp_replace(t_1.filler, '^\s+([[:alnum:]]+)\s+$', lpad('\1', 10), 1, 1, 'c')
);
break on dfo_number nodup on tq_id nodup on server_type skip 1 nodup on instance nodup
-- compute sum label Total of num_rows on server_type
select
/*dfo_number
, */tq_id
, cast(server_type as varchar2(10)) as server_type
, instance
, cast(process as varchar2(8)) as process
, num_rows
, round(ratio_to_report(num_rows) over (partition by dfo_number, tq_id, server_type) * 100) as "%"
, cast(rpad('#', round(num_rows * 10 / nullif(max(num_rows) over (partition by dfo_number, tq_id, server_type), 0)), '#') as varchar2(10)) as graph
, round(bytes / 1024 / 1024) as mb
, round(bytes / nullif(num_rows, 0)) as "bytes/row"
from
v$pq_tqstat
order by
dfo_number
, tq_id
, server_type desc
, instance
, process
;