This is an automated email from the ASF dual-hosted git repository. tarmstrong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 9dd0abbb373c0256a948cdf54a1a677230e7f5cb Author: Aman Sinha <amsi...@cloudera.com> AuthorDate: Thu Dec 10 23:20:05 2020 -0800 IMPALA-10287: Include parallelism in cost comparison of broadcast vs partition The current planner tends to pick broadcast distribution in some cases even when partition distribution would be more optimal (seen in TPC-DS performance runs). This patch adds 2 query options: - use_dop_for_costing (type:boolean, default:true) - broadcast_to_partition_factor (type:double, default:1.0) With use_dop_for_costing enabled, the distributed planner will increase the cost of the broadcast join's build side by C.sqrt(m) where m = degree of parallelism of the join node and, C = the broadcast_to_partition_factor This allows the planner to more favorably consider partition distribution where appropriate. The choice of sqrt in the calculation is not a final choice at this point but is intended to model a non-linear relationship between mt_dop and the query performance. After further performance testing with tuning the above factor, we can establish a better correlation and refine the formula (tracked by IMPALA-10395). Testing: - Added a new test file with TPC-DS Q78 which shows partition distribution for a left-outer join (with store_returns on the right input) in the query when the query options are enabled (it chooses broadcast otherwise). - Ran PlannerTest and TpcdsPlannerTest. - Ran e2e tests for Tpcds and Tpch. Change-Id: Idff569299e5c78720ca17c616a531adac78208e1 Reviewed-on: http://gerrit.cloudera.org:8080/16864 Reviewed-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com> --- be/src/service/query-options.cc | 16 + be/src/service/query-options.h | 6 +- common/thrift/ImpalaInternalService.thrift | 7 + common/thrift/ImpalaService.thrift | 11 + .../apache/impala/planner/DistributedPlanner.java | 21 +- .../org/apache/impala/planner/PlannerTest.java | 8 + .../queries/PlannerTest/tpcds-dist-method.test | 538 +++++++++++++++++++++ 7 files changed, 603 insertions(+), 4 deletions(-) diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc index f2cd720..cc65f08 100644 --- a/be/src/service/query-options.cc +++ b/be/src/service/query-options.cc @@ -988,6 +988,22 @@ Status impala::SetQueryOption(const string& key, const string& value, query_options->__set_report_skew_limit(skew_threshold); break; } + case TImpalaQueryOptions::USE_DOP_FOR_COSTING: { + query_options->__set_use_dop_for_costing(IsTrue(value)); + break; + } + case TImpalaQueryOptions::BROADCAST_TO_PARTITION_FACTOR: { + StringParser::ParseResult result; + const double val = + StringParser::StringToFloat<double>(value.c_str(), value.length(), &result); + if (result != StringParser::PARSE_SUCCESS || val < 0 || val > 1000) { + return Status(Substitute("Invalid broadcast to partition factor '$0'. " + "Only values from 0 to 1000 are allowed.", + value)); + } + query_options->__set_broadcast_to_partition_factor(val); + break; + } default: if (IsRemovedQueryOption(key)) { LOG(WARNING) << "Ignoring attempt to set removed query option '" << key << "'"; diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h index d61e47d..9abd042 100644 --- a/be/src/service/query-options.h +++ b/be/src/service/query-options.h @@ -47,7 +47,7 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type> // time we add or remove a query option to/from the enum TImpalaQueryOptions. #define QUERY_OPTS_TABLE\ DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),\ - TImpalaQueryOptions::OPTIMIZE_SIMPLE_LIMIT + 1);\ + TImpalaQueryOptions::BROADCAST_TO_PARTITION_FACTOR + 1);\ REMOVED_QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED)\ QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR, TQueryOptionLevel::REGULAR)\ REMOVED_QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS)\ @@ -225,6 +225,10 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type> TQueryOptionLevel::ADVANCED)\ QUERY_OPT_FN(optimize_simple_limit, OPTIMIZE_SIMPLE_LIMIT,\ TQueryOptionLevel::REGULAR)\ + QUERY_OPT_FN(use_dop_for_costing, USE_DOP_FOR_COSTING,\ + TQueryOptionLevel::ADVANCED)\ + QUERY_OPT_FN(broadcast_to_partition_factor, BROADCAST_TO_PARTITION_FACTOR,\ + TQueryOptionLevel::ADVANCED)\ ; /// Enforce practical limits on some query options to avoid undesired query state. diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift index 22309a9..d61b9eb 100644 --- a/common/thrift/ImpalaInternalService.thrift +++ b/common/thrift/ImpalaInternalService.thrift @@ -468,6 +468,13 @@ struct TQueryOptions { // See comment in ImpalaService.thrift 117: optional bool optimize_simple_limit = false; + + // See comment in ImpalaService.thrift + 118: optional bool use_dop_for_costing = true; + + // See comment in ImpalaService.thrift + 119: optional double broadcast_to_partition_factor = 1.0; + } // Impala currently has two types of sessions: Beeswax and HiveServer2 diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift index 007ae27..f553d76 100644 --- a/common/thrift/ImpalaService.thrift +++ b/common/thrift/ImpalaService.thrift @@ -602,6 +602,17 @@ enum TImpalaQueryOptions { // This option is opt-in by default as this optimization may in some cases produce // fewer (but correct) rows than the limit value in the query. OPTIMIZE_SIMPLE_LIMIT = 116 + + // When true, the degree of parallelism (if > 1) is used in costing decision + // of a broadcast vs partition distribution. + USE_DOP_FOR_COSTING = 117 + + // A multiplying factor between 0 to 1000 that is applied to the costing decision of + // a broadcast vs partition distribution. Fractional values between 0 to 1 favor + // broadcast by reducing the build side cost of a broadcast join. Values above 1.0 + // favor partition distribution. + BROADCAST_TO_PARTITION_FACTOR = 118 + } // The summary of a DML statement. diff --git a/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java b/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java index 864a1c1..f284e32 100644 --- a/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java +++ b/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java @@ -497,7 +497,24 @@ public class DistributedPlanner { // RHS data must be broadcast once to each node. // TODO: IMPALA-9176: this is inaccurate for NAAJ until IMPALA-9176 is fixed // because it must be broadcast once per instance. - broadcastCost = 2 * rhsDataSize * leftChildNodes; + long dataPayload = rhsDataSize * leftChildNodes; + long hashTblBuildCost = dataPayload; + if (mt_dop > 1 && ctx_.getQueryOptions().use_dop_for_costing) { + // In the broadcast join a single thread per node is building the hash + // table of size N compared to the partition case where m threads are + // building hash tables of size N/m each (assuming uniform distribution). + // Hence, the build side is faster in the latter case. For relative costing, + // we multiply the hash table build cost by C sqrt(m) where m = plan node's + // parallelism and C = a coefficient that controls the function's rate of + // growth (a tunable parameter). We use the sqrt to model a non-linear + // function since the slowdown with broadcast is not exactly linear (TODO: + // more analysis is needed to establish an accurate correlation). + PlanNode leftPlanRoot = leftChildFragment.getPlanRoot(); + int actual_dop = leftPlanRoot.getNumInstances()/leftPlanRoot.getNumNodes(); + hashTblBuildCost *= (long) (ctx_.getQueryOptions().broadcast_to_partition_factor + * Math.max(1.0, Math.sqrt(actual_dop))); + } + broadcastCost = dataPayload + hashTblBuildCost; } } if (LOG.isTraceEnabled()) { @@ -596,8 +613,6 @@ public class DistributedPlanner { ctx_.getQueryOptions().getDefault_join_distribution_mode()); } - int mt_dop = ctx_.getQueryOptions().mt_dop; - // Decide the distribution mode based on the estimated costs, the mem limit and // the broadcast bytes limit. The last value is a safety check to ensure we // don't broadcast very large inputs (for example in case the broadcast cost was diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java index c812430..3f42933 100644 --- a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java +++ b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java @@ -1111,4 +1111,12 @@ public class PlannerTest extends PlannerTestBase { runPlannerTestFile("optimize-simple-limit", options); } + /** + * Test the distribution method for a join + */ + @Test + public void testDistributionMethod() { + runPlannerTestFile("tpcds-dist-method", "tpcds"); + } + } diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/tpcds-dist-method.test b/testdata/workloads/functional-planner/queries/PlannerTest/tpcds-dist-method.test new file mode 100644 index 0000000..72d1bb0 --- /dev/null +++ b/testdata/workloads/functional-planner/queries/PlannerTest/tpcds-dist-method.test @@ -0,0 +1,538 @@ +# Q78 +with ws as + (select d_year AS ws_sold_year, ws_item_sk, + ws_bill_customer_sk ws_customer_sk, + sum(ws_quantity) ws_qty, + sum(ws_wholesale_cost) ws_wc, + sum(ws_sales_price) ws_sp + from web_sales + left join web_returns on wr_order_number=ws_order_number and ws_item_sk=wr_item_sk + join date_dim on ws_sold_date_sk = d_date_sk + where wr_order_number is null + group by d_year, ws_item_sk, ws_bill_customer_sk + ), +cs as + (select d_year AS cs_sold_year, cs_item_sk, + cs_bill_customer_sk cs_customer_sk, + sum(cs_quantity) cs_qty, + sum(cs_wholesale_cost) cs_wc, + sum(cs_sales_price) cs_sp + from catalog_sales + left join catalog_returns on cr_order_number=cs_order_number and cs_item_sk=cr_item_sk + join date_dim on cs_sold_date_sk = d_date_sk + where cr_order_number is null + group by d_year, cs_item_sk, cs_bill_customer_sk + ), +ss as + (select d_year AS ss_sold_year, ss_item_sk, + ss_customer_sk, + sum(ss_quantity) ss_qty, + sum(ss_wholesale_cost) ss_wc, + sum(ss_sales_price) ss_sp + from store_sales + left join store_returns on sr_ticket_number=ss_ticket_number and ss_item_sk=sr_item_sk + join date_dim on ss_sold_date_sk = d_date_sk + where sr_ticket_number is null + group by d_year, ss_item_sk, ss_customer_sk + ) + select +ss_sold_year, ss_item_sk, ss_customer_sk, +round(ss_qty/(coalesce(ws_qty,0)+coalesce(cs_qty,0)),2) ratio, +ss_qty store_qty, ss_wc store_wholesale_cost, ss_sp store_sales_price, +coalesce(ws_qty,0)+coalesce(cs_qty,0) other_chan_qty, +coalesce(ws_wc,0)+coalesce(cs_wc,0) other_chan_wholesale_cost, +coalesce(ws_sp,0)+coalesce(cs_sp,0) other_chan_sales_price +from ss +left join ws on (ws_sold_year=ss_sold_year and ws_item_sk=ss_item_sk and ws_customer_sk=ss_customer_sk) +left join cs on (cs_sold_year=ss_sold_year and cs_item_sk=ss_item_sk and cs_customer_sk=ss_customer_sk) +where (coalesce(ws_qty,0)>0 or coalesce(cs_qty, 0)>0) and ss_sold_year=2002 +order by + ss_sold_year, ss_item_sk, ss_customer_sk, + ss_qty desc, ss_wc desc, ss_sp desc, + other_chan_qty, + other_chan_wholesale_cost, + other_chan_sales_price, + round(ss_qty/(coalesce(ws_qty+cs_qty,1)),2) +limit 100 +---- QUERYOPTIONS +mt_dop=12 +use_dop_for_costing=true +broadcast_to_partition_factor=4.0 +---- PLAN +PLAN-ROOT SINK +| +20:TOP-N [LIMIT=100] +| order by: ss_sold_year ASC, ss_item_sk ASC, ss_customer_sk ASC, ss_qty DESC, ss_wc DESC, ss_sp DESC, coalesce(ws_qty, 0) + coalesce(cs_qty, 0) ASC, coalesce(ws_wc, 0) + coalesce(cs_wc, 0) ASC, coalesce(ws_sp, 0) + coalesce(cs_sp, 0) ASC, round(ss_qty / (coalesce(ws_qty + cs_qty, 1)), 2) ASC +| row-size=104B cardinality=100 +| +19:HASH JOIN [RIGHT OUTER JOIN] +| hash predicates: d_year = d_year, cs_bill_customer_sk = ss_customer_sk, cs_item_sk = ss_item_sk +| other predicates: (coalesce(sum(ws_quantity), 0) > 0 OR coalesce(sum(cs_quantity), 0) > 0) +| runtime filters: RF000 <- d_year, RF001 <- ss_customer_sk, RF002 <- ss_item_sk +| row-size=168B cardinality=3.00K +| +|--18:HASH JOIN [RIGHT OUTER JOIN] +| | hash predicates: d_year = d_year, ws_bill_customer_sk = ss_customer_sk, ws_item_sk = ss_item_sk +| | runtime filters: RF008 <- d_year, RF009 <- ss_customer_sk, RF010 <- ss_item_sk +| | row-size=112B cardinality=3.00K +| | +| |--05:AGGREGATE [FINALIZE] +| | | output: sum(ss_quantity), sum(ss_wholesale_cost), sum(ss_sales_price) +| | | group by: d_year, ss_item_sk, ss_customer_sk +| | | having: d_year = 2002 +| | | row-size=56B cardinality=3.00K +| | | +| | 04:HASH JOIN [INNER JOIN] +| | | hash predicates: ss_sold_date_sk = d_date_sk +| | | runtime filters: RF016 <- d_date_sk +| | | row-size=60B cardinality=589.03K +| | | +| | |--02:SCAN HDFS [tpcds.date_dim] +| | | HDFS partitions=1/1 files=1 size=9.84MB +| | | predicates: tpcds.date_dim.d_year = 2002 +| | | row-size=8B cardinality=373 +| | | +| | 03:HASH JOIN [LEFT OUTER JOIN] +| | | hash predicates: ss_item_sk = sr_item_sk, ss_ticket_number = sr_ticket_number +| | | other predicates: sr_ticket_number IS NULL +| | | row-size=52B cardinality=2.88M +| | | +| | |--01:SCAN HDFS [tpcds.store_returns] +| | | HDFS partitions=1/1 files=1 size=31.19MB +| | | row-size=16B cardinality=287.51K +| | | +| | 00:SCAN HDFS [tpcds.store_sales] +| | HDFS partitions=1824/1824 files=1824 size=346.60MB +| | runtime filters: RF016 -> ss_sold_date_sk +| | row-size=36B cardinality=2.88M +| | +| 11:AGGREGATE [FINALIZE] +| | output: sum(ws_quantity), sum(ws_wholesale_cost), sum(ws_sales_price) +| | group by: d_year, ws_item_sk, ws_bill_customer_sk +| | row-size=56B cardinality=148.00K +| | +| 10:HASH JOIN [INNER JOIN] +| | hash predicates: ws_sold_date_sk = d_date_sk +| | runtime filters: RF014 <- d_date_sk +| | row-size=60B cardinality=148.00K +| | +| |--08:SCAN HDFS [tpcds.date_dim] +| | HDFS partitions=1/1 files=1 size=9.84MB +| | predicates: tpcds.date_dim.d_year = 2002 +| | runtime filters: RF008 -> tpcds.date_dim.d_year +| | row-size=8B cardinality=373 +| | +| 09:HASH JOIN [LEFT OUTER JOIN] +| | hash predicates: ws_item_sk = wr_item_sk, ws_order_number = wr_order_number +| | other predicates: wr_order_number IS NULL +| | row-size=52B cardinality=719.38K +| | +| |--07:SCAN HDFS [tpcds.web_returns] +| | HDFS partitions=1/1 files=1 size=9.35MB +| | runtime filters: RF010 -> tpcds.web_returns.wr_item_sk +| | row-size=16B cardinality=71.76K +| | +| 06:SCAN HDFS [tpcds.web_sales] +| HDFS partitions=1/1 files=1 size=140.07MB +| runtime filters: RF009 -> tpcds.web_sales.ws_bill_customer_sk, RF010 -> tpcds.web_sales.ws_item_sk, RF014 -> ws_sold_date_sk +| row-size=36B cardinality=719.38K +| +17:AGGREGATE [FINALIZE] +| output: sum(cs_quantity), sum(cs_wholesale_cost), sum(cs_sales_price) +| group by: d_year, cs_item_sk, cs_bill_customer_sk +| row-size=56B cardinality=294.63K +| +16:HASH JOIN [INNER JOIN] +| hash predicates: cs_sold_date_sk = d_date_sk +| runtime filters: RF006 <- d_date_sk +| row-size=60B cardinality=294.63K +| +|--14:SCAN HDFS [tpcds.date_dim] +| HDFS partitions=1/1 files=1 size=9.84MB +| predicates: tpcds.date_dim.d_year = 2002 +| runtime filters: RF000 -> tpcds.date_dim.d_year +| row-size=8B cardinality=373 +| +15:HASH JOIN [LEFT OUTER JOIN] +| hash predicates: cs_item_sk = cr_item_sk, cs_order_number = cr_order_number +| other predicates: cr_order_number IS NULL +| row-size=52B cardinality=1.44M +| +|--13:SCAN HDFS [tpcds.catalog_returns] +| HDFS partitions=1/1 files=1 size=20.39MB +| runtime filters: RF002 -> tpcds.catalog_returns.cr_item_sk +| row-size=16B cardinality=144.07K +| +12:SCAN HDFS [tpcds.catalog_sales] + HDFS partitions=1/1 files=1 size=282.20MB + runtime filters: RF001 -> tpcds.catalog_sales.cs_bill_customer_sk, RF002 -> tpcds.catalog_sales.cs_item_sk, RF006 -> cs_sold_date_sk + row-size=36B cardinality=1.44M +---- DISTRIBUTEDPLAN +PLAN-ROOT SINK +| +36:MERGING-EXCHANGE [UNPARTITIONED] +| order by: ss_sold_year ASC, ss_item_sk ASC, ss_customer_sk ASC, ss_qty DESC, ss_wc DESC, ss_sp DESC, coalesce(ws_qty, 0) + coalesce(cs_qty, 0) ASC, coalesce(ws_wc, 0) + coalesce(cs_wc, 0) ASC, coalesce(ws_sp, 0) + coalesce(cs_sp, 0) ASC, round(ss_qty / (coalesce(ws_qty + cs_qty, 1)), 2) ASC +| limit: 100 +| +20:TOP-N [LIMIT=100] +| order by: ss_sold_year ASC, ss_item_sk ASC, ss_customer_sk ASC, ss_qty DESC, ss_wc DESC, ss_sp DESC, coalesce(ws_qty, 0) + coalesce(cs_qty, 0) ASC, coalesce(ws_wc, 0) + coalesce(cs_wc, 0) ASC, coalesce(ws_sp, 0) + coalesce(cs_sp, 0) ASC, round(ss_qty / (coalesce(ws_qty + cs_qty, 1)), 2) ASC +| row-size=104B cardinality=100 +| +19:HASH JOIN [RIGHT OUTER JOIN, PARTITIONED] +| hash predicates: d_year = d_year, cs_bill_customer_sk = ss_customer_sk, cs_item_sk = ss_item_sk +| other predicates: (coalesce(sum(ws_quantity), 0) > 0 OR coalesce(sum(cs_quantity), 0) > 0) +| row-size=168B cardinality=3.00K +| +|--JOIN BUILD +| | join-table-id=00 plan-id=01 cohort-id=01 +| | build expressions: d_year, ss_customer_sk, ss_item_sk +| | runtime filters: RF000 <- d_year, RF001 <- ss_customer_sk, RF002 <- ss_item_sk +| | +| 35:EXCHANGE [HASH(d_year,ss_item_sk,ss_customer_sk)] +| | +| 18:HASH JOIN [RIGHT OUTER JOIN, PARTITIONED] +| | hash predicates: d_year = d_year, ws_bill_customer_sk = ss_customer_sk, ws_item_sk = ss_item_sk +| | row-size=112B cardinality=3.00K +| | +| |--JOIN BUILD +| | | join-table-id=01 plan-id=02 cohort-id=02 +| | | build expressions: d_year, ss_customer_sk, ss_item_sk +| | | runtime filters: RF008 <- d_year, RF009 <- ss_customer_sk, RF010 <- ss_item_sk +| | | +| | 34:EXCHANGE [HASH(d_year,ss_item_sk,ss_customer_sk)] +| | | +| | 33:AGGREGATE [FINALIZE] +| | | output: sum:merge(ss_quantity), sum:merge(ss_wholesale_cost), sum:merge(ss_sales_price) +| | | group by: d_year, ss_item_sk, ss_customer_sk +| | | having: d_year = 2002 +| | | row-size=56B cardinality=3.00K +| | | +| | 32:EXCHANGE [HASH(d_year,ss_item_sk,ss_customer_sk)] +| | | +| | 05:AGGREGATE [STREAMING] +| | | output: sum(ss_quantity), sum(ss_wholesale_cost), sum(ss_sales_price) +| | | group by: d_year, ss_item_sk, ss_customer_sk +| | | row-size=56B cardinality=589.03K +| | | +| | 04:HASH JOIN [INNER JOIN, BROADCAST] +| | | hash predicates: ss_sold_date_sk = d_date_sk +| | | row-size=60B cardinality=589.03K +| | | +| | |--JOIN BUILD +| | | | join-table-id=02 plan-id=03 cohort-id=03 +| | | | build expressions: d_date_sk +| | | | runtime filters: RF016 <- d_date_sk +| | | | +| | | 31:EXCHANGE [BROADCAST] +| | | | +| | | 02:SCAN HDFS [tpcds.date_dim] +| | | HDFS partitions=1/1 files=1 size=9.84MB +| | | predicates: tpcds.date_dim.d_year = 2002 +| | | row-size=8B cardinality=373 +| | | +| | 03:HASH JOIN [LEFT OUTER JOIN, PARTITIONED] +| | | hash predicates: ss_item_sk = sr_item_sk, ss_ticket_number = sr_ticket_number +| | | other predicates: sr_ticket_number IS NULL +| | | row-size=52B cardinality=2.88M +| | | +| | |--JOIN BUILD +| | | | join-table-id=03 plan-id=04 cohort-id=03 +| | | | build expressions: sr_item_sk, sr_ticket_number +| | | | +| | | 30:EXCHANGE [HASH(sr_item_sk,sr_ticket_number)] +| | | | +| | | 01:SCAN HDFS [tpcds.store_returns] +| | | HDFS partitions=1/1 files=1 size=31.19MB +| | | row-size=16B cardinality=287.51K +| | | +| | 29:EXCHANGE [HASH(ss_item_sk,ss_ticket_number)] +| | | +| | 00:SCAN HDFS [tpcds.store_sales] +| | HDFS partitions=1824/1824 files=1824 size=346.60MB +| | runtime filters: RF016 -> ss_sold_date_sk +| | row-size=36B cardinality=2.88M +| | +| 28:AGGREGATE [FINALIZE] +| | output: sum:merge(ws_quantity), sum:merge(ws_wholesale_cost), sum:merge(ws_sales_price) +| | group by: d_year, ws_item_sk, ws_bill_customer_sk +| | row-size=56B cardinality=148.00K +| | +| 27:EXCHANGE [HASH(d_year,ws_item_sk,ws_bill_customer_sk)] +| | +| 11:AGGREGATE [STREAMING] +| | output: sum(ws_quantity), sum(ws_wholesale_cost), sum(ws_sales_price) +| | group by: d_year, ws_item_sk, ws_bill_customer_sk +| | row-size=56B cardinality=148.00K +| | +| 10:HASH JOIN [INNER JOIN, BROADCAST] +| | hash predicates: ws_sold_date_sk = d_date_sk +| | row-size=60B cardinality=148.00K +| | +| |--JOIN BUILD +| | | join-table-id=04 plan-id=05 cohort-id=02 +| | | build expressions: d_date_sk +| | | runtime filters: RF014 <- d_date_sk +| | | +| | 26:EXCHANGE [BROADCAST] +| | | +| | 08:SCAN HDFS [tpcds.date_dim] +| | HDFS partitions=1/1 files=1 size=9.84MB +| | predicates: tpcds.date_dim.d_year = 2002 +| | runtime filters: RF008 -> tpcds.date_dim.d_year +| | row-size=8B cardinality=373 +| | +| 09:HASH JOIN [LEFT OUTER JOIN, BROADCAST] +| | hash predicates: ws_item_sk = wr_item_sk, ws_order_number = wr_order_number +| | other predicates: wr_order_number IS NULL +| | row-size=52B cardinality=719.38K +| | +| |--JOIN BUILD +| | | join-table-id=05 plan-id=06 cohort-id=02 +| | | build expressions: wr_item_sk, wr_order_number +| | | +| | 25:EXCHANGE [BROADCAST] +| | | +| | 07:SCAN HDFS [tpcds.web_returns] +| | HDFS partitions=1/1 files=1 size=9.35MB +| | runtime filters: RF010 -> tpcds.web_returns.wr_item_sk +| | row-size=16B cardinality=71.76K +| | +| 06:SCAN HDFS [tpcds.web_sales] +| HDFS partitions=1/1 files=1 size=140.07MB +| runtime filters: RF009 -> tpcds.web_sales.ws_bill_customer_sk, RF010 -> tpcds.web_sales.ws_item_sk, RF014 -> ws_sold_date_sk +| row-size=36B cardinality=719.38K +| +24:AGGREGATE [FINALIZE] +| output: sum:merge(cs_quantity), sum:merge(cs_wholesale_cost), sum:merge(cs_sales_price) +| group by: d_year, cs_item_sk, cs_bill_customer_sk +| row-size=56B cardinality=294.63K +| +23:EXCHANGE [HASH(d_year,cs_item_sk,cs_bill_customer_sk)] +| +17:AGGREGATE [STREAMING] +| output: sum(cs_quantity), sum(cs_wholesale_cost), sum(cs_sales_price) +| group by: d_year, cs_item_sk, cs_bill_customer_sk +| row-size=56B cardinality=294.63K +| +16:HASH JOIN [INNER JOIN, BROADCAST] +| hash predicates: cs_sold_date_sk = d_date_sk +| row-size=60B cardinality=294.63K +| +|--JOIN BUILD +| | join-table-id=06 plan-id=07 cohort-id=01 +| | build expressions: d_date_sk +| | runtime filters: RF006 <- d_date_sk +| | +| 22:EXCHANGE [BROADCAST] +| | +| 14:SCAN HDFS [tpcds.date_dim] +| HDFS partitions=1/1 files=1 size=9.84MB +| predicates: tpcds.date_dim.d_year = 2002 +| runtime filters: RF000 -> tpcds.date_dim.d_year +| row-size=8B cardinality=373 +| +15:HASH JOIN [LEFT OUTER JOIN, BROADCAST] +| hash predicates: cs_item_sk = cr_item_sk, cs_order_number = cr_order_number +| other predicates: cr_order_number IS NULL +| row-size=52B cardinality=1.44M +| +|--JOIN BUILD +| | join-table-id=07 plan-id=08 cohort-id=01 +| | build expressions: cr_item_sk, cr_order_number +| | +| 21:EXCHANGE [BROADCAST] +| | +| 13:SCAN HDFS [tpcds.catalog_returns] +| HDFS partitions=1/1 files=1 size=20.39MB +| runtime filters: RF002 -> tpcds.catalog_returns.cr_item_sk +| row-size=16B cardinality=144.07K +| +12:SCAN HDFS [tpcds.catalog_sales] + HDFS partitions=1/1 files=1 size=282.20MB + runtime filters: RF001 -> tpcds.catalog_sales.cs_bill_customer_sk, RF002 -> tpcds.catalog_sales.cs_item_sk, RF006 -> cs_sold_date_sk + row-size=36B cardinality=1.44M +---- PARALLELPLANS +PLAN-ROOT SINK +| +36:MERGING-EXCHANGE [UNPARTITIONED] +| order by: ss_sold_year ASC, ss_item_sk ASC, ss_customer_sk ASC, ss_qty DESC, ss_wc DESC, ss_sp DESC, coalesce(ws_qty, 0) + coalesce(cs_qty, 0) ASC, coalesce(ws_wc, 0) + coalesce(cs_wc, 0) ASC, coalesce(ws_sp, 0) + coalesce(cs_sp, 0) ASC, round(ss_qty / (coalesce(ws_qty + cs_qty, 1)), 2) ASC +| limit: 100 +| +20:TOP-N [LIMIT=100] +| order by: ss_sold_year ASC, ss_item_sk ASC, ss_customer_sk ASC, ss_qty DESC, ss_wc DESC, ss_sp DESC, coalesce(ws_qty, 0) + coalesce(cs_qty, 0) ASC, coalesce(ws_wc, 0) + coalesce(cs_wc, 0) ASC, coalesce(ws_sp, 0) + coalesce(cs_sp, 0) ASC, round(ss_qty / (coalesce(ws_qty + cs_qty, 1)), 2) ASC +| row-size=104B cardinality=100 +| +19:HASH JOIN [RIGHT OUTER JOIN, PARTITIONED] +| hash predicates: d_year = d_year, cs_bill_customer_sk = ss_customer_sk, cs_item_sk = ss_item_sk +| other predicates: (coalesce(sum(ws_quantity), 0) > 0 OR coalesce(sum(cs_quantity), 0) > 0) +| row-size=168B cardinality=3.00K +| +|--JOIN BUILD +| | join-table-id=00 plan-id=01 cohort-id=01 +| | build expressions: d_year, ss_customer_sk, ss_item_sk +| | runtime filters: RF000 <- d_year, RF001 <- ss_customer_sk, RF002 <- ss_item_sk +| | +| 35:EXCHANGE [HASH(d_year,ss_item_sk,ss_customer_sk)] +| | +| 18:HASH JOIN [RIGHT OUTER JOIN, PARTITIONED] +| | hash predicates: d_year = d_year, ws_bill_customer_sk = ss_customer_sk, ws_item_sk = ss_item_sk +| | row-size=112B cardinality=3.00K +| | +| |--JOIN BUILD +| | | join-table-id=01 plan-id=02 cohort-id=02 +| | | build expressions: d_year, ss_customer_sk, ss_item_sk +| | | runtime filters: RF008 <- d_year, RF009 <- ss_customer_sk, RF010 <- ss_item_sk +| | | +| | 34:EXCHANGE [HASH(d_year,ss_item_sk,ss_customer_sk)] +| | | +| | 33:AGGREGATE [FINALIZE] +| | | output: sum:merge(ss_quantity), sum:merge(ss_wholesale_cost), sum:merge(ss_sales_price) +| | | group by: d_year, ss_item_sk, ss_customer_sk +| | | having: d_year = 2002 +| | | row-size=56B cardinality=3.00K +| | | +| | 32:EXCHANGE [HASH(d_year,ss_item_sk,ss_customer_sk)] +| | | +| | 05:AGGREGATE [STREAMING] +| | | output: sum(ss_quantity), sum(ss_wholesale_cost), sum(ss_sales_price) +| | | group by: d_year, ss_item_sk, ss_customer_sk +| | | row-size=56B cardinality=589.03K +| | | +| | 04:HASH JOIN [INNER JOIN, BROADCAST] +| | | hash predicates: ss_sold_date_sk = d_date_sk +| | | row-size=60B cardinality=589.03K +| | | +| | |--JOIN BUILD +| | | | join-table-id=02 plan-id=03 cohort-id=03 +| | | | build expressions: d_date_sk +| | | | runtime filters: RF016 <- d_date_sk +| | | | +| | | 31:EXCHANGE [BROADCAST] +| | | | +| | | 02:SCAN HDFS [tpcds.date_dim] +| | | HDFS partitions=1/1 files=1 size=9.84MB +| | | predicates: tpcds.date_dim.d_year = 2002 +| | | row-size=8B cardinality=373 +| | | +| | 03:HASH JOIN [LEFT OUTER JOIN, PARTITIONED] +| | | hash predicates: ss_item_sk = sr_item_sk, ss_ticket_number = sr_ticket_number +| | | other predicates: sr_ticket_number IS NULL +| | | row-size=52B cardinality=2.88M +| | | +| | |--JOIN BUILD +| | | | join-table-id=03 plan-id=04 cohort-id=03 +| | | | build expressions: sr_item_sk, sr_ticket_number +| | | | +| | | 30:EXCHANGE [HASH(sr_item_sk,sr_ticket_number)] +| | | | +| | | 01:SCAN HDFS [tpcds.store_returns] +| | | HDFS partitions=1/1 files=1 size=31.19MB +| | | row-size=16B cardinality=287.51K +| | | +| | 29:EXCHANGE [HASH(ss_item_sk,ss_ticket_number)] +| | | +| | 00:SCAN HDFS [tpcds.store_sales] +| | HDFS partitions=1824/1824 files=1824 size=346.60MB +| | runtime filters: RF016 -> ss_sold_date_sk +| | row-size=36B cardinality=2.88M +| | +| 28:AGGREGATE [FINALIZE] +| | output: sum:merge(ws_quantity), sum:merge(ws_wholesale_cost), sum:merge(ws_sales_price) +| | group by: d_year, ws_item_sk, ws_bill_customer_sk +| | row-size=56B cardinality=148.00K +| | +| 27:EXCHANGE [HASH(d_year,ws_item_sk,ws_bill_customer_sk)] +| | +| 11:AGGREGATE [STREAMING] +| | output: sum(ws_quantity), sum(ws_wholesale_cost), sum(ws_sales_price) +| | group by: d_year, ws_item_sk, ws_bill_customer_sk +| | row-size=56B cardinality=148.00K +| | +| 10:HASH JOIN [INNER JOIN, BROADCAST] +| | hash predicates: ws_sold_date_sk = d_date_sk +| | row-size=60B cardinality=148.00K +| | +| |--JOIN BUILD +| | | join-table-id=04 plan-id=05 cohort-id=02 +| | | build expressions: d_date_sk +| | | runtime filters: RF014 <- d_date_sk +| | | +| | 26:EXCHANGE [BROADCAST] +| | | +| | 08:SCAN HDFS [tpcds.date_dim] +| | HDFS partitions=1/1 files=1 size=9.84MB +| | predicates: tpcds.date_dim.d_year = 2002 +| | runtime filters: RF008 -> tpcds.date_dim.d_year +| | row-size=8B cardinality=373 +| | +| 09:HASH JOIN [LEFT OUTER JOIN, BROADCAST] +| | hash predicates: ws_item_sk = wr_item_sk, ws_order_number = wr_order_number +| | other predicates: wr_order_number IS NULL +| | row-size=52B cardinality=719.38K +| | +| |--JOIN BUILD +| | | join-table-id=05 plan-id=06 cohort-id=02 +| | | build expressions: wr_item_sk, wr_order_number +| | | +| | 25:EXCHANGE [BROADCAST] +| | | +| | 07:SCAN HDFS [tpcds.web_returns] +| | HDFS partitions=1/1 files=1 size=9.35MB +| | runtime filters: RF010 -> tpcds.web_returns.wr_item_sk +| | row-size=16B cardinality=71.76K +| | +| 06:SCAN HDFS [tpcds.web_sales] +| HDFS partitions=1/1 files=1 size=140.07MB +| runtime filters: RF009 -> tpcds.web_sales.ws_bill_customer_sk, RF010 -> tpcds.web_sales.ws_item_sk, RF014 -> ws_sold_date_sk +| row-size=36B cardinality=719.38K +| +24:AGGREGATE [FINALIZE] +| output: sum:merge(cs_quantity), sum:merge(cs_wholesale_cost), sum:merge(cs_sales_price) +| group by: d_year, cs_item_sk, cs_bill_customer_sk +| row-size=56B cardinality=294.63K +| +23:EXCHANGE [HASH(d_year,cs_item_sk,cs_bill_customer_sk)] +| +17:AGGREGATE [STREAMING] +| output: sum(cs_quantity), sum(cs_wholesale_cost), sum(cs_sales_price) +| group by: d_year, cs_item_sk, cs_bill_customer_sk +| row-size=56B cardinality=294.63K +| +16:HASH JOIN [INNER JOIN, BROADCAST] +| hash predicates: cs_sold_date_sk = d_date_sk +| row-size=60B cardinality=294.63K +| +|--JOIN BUILD +| | join-table-id=06 plan-id=07 cohort-id=01 +| | build expressions: d_date_sk +| | runtime filters: RF006 <- d_date_sk +| | +| 22:EXCHANGE [BROADCAST] +| | +| 14:SCAN HDFS [tpcds.date_dim] +| HDFS partitions=1/1 files=1 size=9.84MB +| predicates: tpcds.date_dim.d_year = 2002 +| runtime filters: RF000 -> tpcds.date_dim.d_year +| row-size=8B cardinality=373 +| +15:HASH JOIN [LEFT OUTER JOIN, BROADCAST] +| hash predicates: cs_item_sk = cr_item_sk, cs_order_number = cr_order_number +| other predicates: cr_order_number IS NULL +| row-size=52B cardinality=1.44M +| +|--JOIN BUILD +| | join-table-id=07 plan-id=08 cohort-id=01 +| | build expressions: cr_item_sk, cr_order_number +| | +| 21:EXCHANGE [BROADCAST] +| | +| 13:SCAN HDFS [tpcds.catalog_returns] +| HDFS partitions=1/1 files=1 size=20.39MB +| runtime filters: RF002 -> tpcds.catalog_returns.cr_item_sk +| row-size=16B cardinality=144.07K +| +12:SCAN HDFS [tpcds.catalog_sales] + HDFS partitions=1/1 files=1 size=282.20MB + runtime filters: RF001 -> tpcds.catalog_sales.cs_bill_customer_sk, RF002 -> tpcds.catalog_sales.cs_item_sk, RF006 -> cs_sold_date_sk + row-size=36B cardinality=1.44M +====