This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 9dd0abbb373c0256a948cdf54a1a677230e7f5cb
Author: Aman Sinha <amsi...@cloudera.com>
AuthorDate: Thu Dec 10 23:20:05 2020 -0800

    IMPALA-10287: Include parallelism in cost comparison of broadcast vs 
partition
    
    The current planner tends to pick broadcast distribution in some cases
    even when partition distribution would be more optimal (seen in
    TPC-DS performance runs).
    
    This patch adds 2 query options:
     - use_dop_for_costing (type:boolean, default:true)
     - broadcast_to_partition_factor (type:double, default:1.0)
    With use_dop_for_costing enabled, the distributed planner will increase
    the cost of the broadcast join's build side by C.sqrt(m) where
    m = degree of parallelism of the join node and,
    C = the broadcast_to_partition_factor
    This allows the planner to more favorably consider partition distribution
    where appropriate.
    
    The choice of sqrt in the calculation is not a final choice
    at this point but is intended to model a non-linear relationship
    between mt_dop and the query performance. After further performance
    testing with tuning the above factor, we can establish a better
    correlation and refine the formula (tracked by IMPALA-10395).
    
    Testing:
     - Added a new test file with TPC-DS Q78 which shows partition
       distribution for a left-outer join (with store_returns on the right
       input) in the query when the query options are enabled (it chooses
       broadcast otherwise).
     - Ran PlannerTest and TpcdsPlannerTest.
     - Ran e2e tests for Tpcds and Tpch.
    
    Change-Id: Idff569299e5c78720ca17c616a531adac78208e1
    Reviewed-on: http://gerrit.cloudera.org:8080/16864
    Reviewed-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
    Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
---
 be/src/service/query-options.cc                    |  16 +
 be/src/service/query-options.h                     |   6 +-
 common/thrift/ImpalaInternalService.thrift         |   7 +
 common/thrift/ImpalaService.thrift                 |  11 +
 .../apache/impala/planner/DistributedPlanner.java  |  21 +-
 .../org/apache/impala/planner/PlannerTest.java     |   8 +
 .../queries/PlannerTest/tpcds-dist-method.test     | 538 +++++++++++++++++++++
 7 files changed, 603 insertions(+), 4 deletions(-)

diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index f2cd720..cc65f08 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -988,6 +988,22 @@ Status impala::SetQueryOption(const string& key, const 
string& value,
         query_options->__set_report_skew_limit(skew_threshold);
         break;
       }
+      case TImpalaQueryOptions::USE_DOP_FOR_COSTING: {
+        query_options->__set_use_dop_for_costing(IsTrue(value));
+        break;
+      }
+      case TImpalaQueryOptions::BROADCAST_TO_PARTITION_FACTOR: {
+        StringParser::ParseResult result;
+        const double val =
+            StringParser::StringToFloat<double>(value.c_str(), value.length(), 
&result);
+        if (result != StringParser::PARSE_SUCCESS || val < 0 || val > 1000) {
+          return Status(Substitute("Invalid broadcast to partition factor 
'$0'. "
+                                   "Only values from 0 to 1000 are allowed.",
+              value));
+        }
+        query_options->__set_broadcast_to_partition_factor(val);
+        break;
+      }
       default:
         if (IsRemovedQueryOption(key)) {
           LOG(WARNING) << "Ignoring attempt to set removed query option '" << 
key << "'";
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index d61e47d..9abd042 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -47,7 +47,7 @@ typedef std::unordered_map<string, 
beeswax::TQueryOptionLevel::type>
 // time we add or remove a query option to/from the enum TImpalaQueryOptions.
 #define QUERY_OPTS_TABLE\
   DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),\
-      TImpalaQueryOptions::OPTIMIZE_SIMPLE_LIMIT + 1);\
+      TImpalaQueryOptions::BROADCAST_TO_PARTITION_FACTOR + 1);\
   REMOVED_QUERY_OPT_FN(abort_on_default_limit_exceeded, 
ABORT_ON_DEFAULT_LIMIT_EXCEEDED)\
   QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR, TQueryOptionLevel::REGULAR)\
   REMOVED_QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS)\
@@ -225,6 +225,10 @@ typedef std::unordered_map<string, 
beeswax::TQueryOptionLevel::type>
       TQueryOptionLevel::ADVANCED)\
   QUERY_OPT_FN(optimize_simple_limit, OPTIMIZE_SIMPLE_LIMIT,\
       TQueryOptionLevel::REGULAR)\
+  QUERY_OPT_FN(use_dop_for_costing, USE_DOP_FOR_COSTING,\
+      TQueryOptionLevel::ADVANCED)\
+  QUERY_OPT_FN(broadcast_to_partition_factor, BROADCAST_TO_PARTITION_FACTOR,\
+      TQueryOptionLevel::ADVANCED)\
   ;
 
 /// Enforce practical limits on some query options to avoid undesired query 
state.
diff --git a/common/thrift/ImpalaInternalService.thrift 
b/common/thrift/ImpalaInternalService.thrift
index 22309a9..d61b9eb 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -468,6 +468,13 @@ struct TQueryOptions {
 
   // See comment in ImpalaService.thrift
   117: optional bool optimize_simple_limit = false;
+
+  // See comment in ImpalaService.thrift
+  118: optional bool use_dop_for_costing = true;
+
+  // See comment in ImpalaService.thrift
+  119: optional double broadcast_to_partition_factor = 1.0;
+
 }
 
 // Impala currently has two types of sessions: Beeswax and HiveServer2
diff --git a/common/thrift/ImpalaService.thrift 
b/common/thrift/ImpalaService.thrift
index 007ae27..f553d76 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -602,6 +602,17 @@ enum TImpalaQueryOptions {
   // This option is opt-in by default as this optimization may in some cases 
produce
   // fewer (but correct) rows than the limit value in the query.
   OPTIMIZE_SIMPLE_LIMIT = 116
+
+  // When true, the degree of parallelism (if > 1) is used in costing decision
+  // of a broadcast vs partition distribution.
+  USE_DOP_FOR_COSTING = 117
+
+  // A multiplying factor between 0 to 1000 that is applied to the costing 
decision of
+  // a broadcast vs partition distribution. Fractional values between 0 to 1 
favor
+  // broadcast by reducing the build side cost of a broadcast join. Values 
above 1.0
+  // favor partition distribution.
+  BROADCAST_TO_PARTITION_FACTOR = 118
+
 }
 
 // The summary of a DML statement.
diff --git a/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java 
b/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
index 864a1c1..f284e32 100644
--- a/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
+++ b/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
@@ -497,7 +497,24 @@ public class DistributedPlanner {
         // RHS data must be broadcast once to each node.
         // TODO: IMPALA-9176: this is inaccurate for NAAJ until IMPALA-9176 is 
fixed
         // because it must be broadcast once per instance.
-        broadcastCost = 2 * rhsDataSize * leftChildNodes;
+        long dataPayload = rhsDataSize * leftChildNodes;
+        long hashTblBuildCost = dataPayload;
+        if (mt_dop > 1 && ctx_.getQueryOptions().use_dop_for_costing) {
+          // In the broadcast join a single thread per node is building the 
hash
+          // table of size N compared to the partition case where m threads are
+          // building hash tables of size N/m each (assuming uniform 
distribution).
+          // Hence, the build side is faster in the latter case. For relative 
costing,
+          // we multiply the hash table build cost by C sqrt(m) where m = plan 
node's
+          // parallelism and C = a coefficient that controls the function's 
rate of
+          // growth (a tunable parameter). We use the sqrt to model a 
non-linear
+          // function since the slowdown with broadcast is not exactly linear 
(TODO:
+          // more analysis is needed to establish an accurate correlation).
+          PlanNode leftPlanRoot = leftChildFragment.getPlanRoot();
+          int actual_dop = 
leftPlanRoot.getNumInstances()/leftPlanRoot.getNumNodes();
+          hashTblBuildCost *= (long) 
(ctx_.getQueryOptions().broadcast_to_partition_factor
+              * Math.max(1.0, Math.sqrt(actual_dop)));
+        }
+        broadcastCost = dataPayload + hashTblBuildCost;
       }
     }
     if (LOG.isTraceEnabled()) {
@@ -596,8 +613,6 @@ public class DistributedPlanner {
          ctx_.getQueryOptions().getDefault_join_distribution_mode());
    }
 
-   int mt_dop = ctx_.getQueryOptions().mt_dop;
-
    // Decide the distribution mode based on the estimated costs, the mem limit 
and
    // the broadcast bytes limit. The last value is a safety check to ensure we
    // don't broadcast very large inputs (for example in case the broadcast 
cost was
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java 
b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
index c812430..3f42933 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
@@ -1111,4 +1111,12 @@ public class PlannerTest extends PlannerTestBase {
     runPlannerTestFile("optimize-simple-limit", options);
   }
 
+  /**
+   * Test the distribution method for a join
+   */
+  @Test
+  public void testDistributionMethod() {
+    runPlannerTestFile("tpcds-dist-method", "tpcds");
+  }
+
 }
diff --git 
a/testdata/workloads/functional-planner/queries/PlannerTest/tpcds-dist-method.test
 
b/testdata/workloads/functional-planner/queries/PlannerTest/tpcds-dist-method.test
new file mode 100644
index 0000000..72d1bb0
--- /dev/null
+++ 
b/testdata/workloads/functional-planner/queries/PlannerTest/tpcds-dist-method.test
@@ -0,0 +1,538 @@
+# Q78
+with ws as
+  (select d_year AS ws_sold_year, ws_item_sk,
+    ws_bill_customer_sk ws_customer_sk,
+    sum(ws_quantity) ws_qty,
+    sum(ws_wholesale_cost) ws_wc,
+    sum(ws_sales_price) ws_sp
+   from web_sales
+   left join web_returns on wr_order_number=ws_order_number and 
ws_item_sk=wr_item_sk
+   join date_dim on ws_sold_date_sk = d_date_sk
+   where wr_order_number is null
+   group by d_year, ws_item_sk, ws_bill_customer_sk
+   ),
+cs as
+  (select d_year AS cs_sold_year, cs_item_sk,
+    cs_bill_customer_sk cs_customer_sk,
+    sum(cs_quantity) cs_qty,
+    sum(cs_wholesale_cost) cs_wc,
+    sum(cs_sales_price) cs_sp
+   from catalog_sales
+   left join catalog_returns on cr_order_number=cs_order_number and 
cs_item_sk=cr_item_sk
+   join date_dim on cs_sold_date_sk = d_date_sk
+   where cr_order_number is null
+   group by d_year, cs_item_sk, cs_bill_customer_sk
+   ),
+ss as
+  (select d_year AS ss_sold_year, ss_item_sk,
+    ss_customer_sk,
+    sum(ss_quantity) ss_qty,
+    sum(ss_wholesale_cost) ss_wc,
+    sum(ss_sales_price) ss_sp
+   from store_sales
+   left join store_returns on sr_ticket_number=ss_ticket_number and 
ss_item_sk=sr_item_sk
+   join date_dim on ss_sold_date_sk = d_date_sk
+   where sr_ticket_number is null
+   group by d_year, ss_item_sk, ss_customer_sk
+   )
+ select
+ss_sold_year, ss_item_sk, ss_customer_sk,
+round(ss_qty/(coalesce(ws_qty,0)+coalesce(cs_qty,0)),2) ratio,
+ss_qty store_qty, ss_wc store_wholesale_cost, ss_sp store_sales_price,
+coalesce(ws_qty,0)+coalesce(cs_qty,0) other_chan_qty,
+coalesce(ws_wc,0)+coalesce(cs_wc,0) other_chan_wholesale_cost,
+coalesce(ws_sp,0)+coalesce(cs_sp,0) other_chan_sales_price
+from ss
+left join ws on (ws_sold_year=ss_sold_year and ws_item_sk=ss_item_sk and 
ws_customer_sk=ss_customer_sk)
+left join cs on (cs_sold_year=ss_sold_year and cs_item_sk=ss_item_sk and 
cs_customer_sk=ss_customer_sk)
+where (coalesce(ws_qty,0)>0 or coalesce(cs_qty, 0)>0) and ss_sold_year=2002
+order by
+  ss_sold_year, ss_item_sk, ss_customer_sk,
+  ss_qty desc, ss_wc desc, ss_sp desc,
+  other_chan_qty,
+  other_chan_wholesale_cost,
+  other_chan_sales_price,
+  round(ss_qty/(coalesce(ws_qty+cs_qty,1)),2)
+limit 100
+---- QUERYOPTIONS
+mt_dop=12
+use_dop_for_costing=true
+broadcast_to_partition_factor=4.0
+---- PLAN
+PLAN-ROOT SINK
+|
+20:TOP-N [LIMIT=100]
+|  order by: ss_sold_year ASC, ss_item_sk ASC, ss_customer_sk ASC, ss_qty 
DESC, ss_wc DESC, ss_sp DESC, coalesce(ws_qty, 0) + coalesce(cs_qty, 0) ASC, 
coalesce(ws_wc, 0) + coalesce(cs_wc, 0) ASC, coalesce(ws_sp, 0) + 
coalesce(cs_sp, 0) ASC, round(ss_qty / (coalesce(ws_qty + cs_qty, 1)), 2) ASC
+|  row-size=104B cardinality=100
+|
+19:HASH JOIN [RIGHT OUTER JOIN]
+|  hash predicates: d_year = d_year, cs_bill_customer_sk = ss_customer_sk, 
cs_item_sk = ss_item_sk
+|  other predicates: (coalesce(sum(ws_quantity), 0) > 0 OR 
coalesce(sum(cs_quantity), 0) > 0)
+|  runtime filters: RF000 <- d_year, RF001 <- ss_customer_sk, RF002 <- 
ss_item_sk
+|  row-size=168B cardinality=3.00K
+|
+|--18:HASH JOIN [RIGHT OUTER JOIN]
+|  |  hash predicates: d_year = d_year, ws_bill_customer_sk = ss_customer_sk, 
ws_item_sk = ss_item_sk
+|  |  runtime filters: RF008 <- d_year, RF009 <- ss_customer_sk, RF010 <- 
ss_item_sk
+|  |  row-size=112B cardinality=3.00K
+|  |
+|  |--05:AGGREGATE [FINALIZE]
+|  |  |  output: sum(ss_quantity), sum(ss_wholesale_cost), sum(ss_sales_price)
+|  |  |  group by: d_year, ss_item_sk, ss_customer_sk
+|  |  |  having: d_year = 2002
+|  |  |  row-size=56B cardinality=3.00K
+|  |  |
+|  |  04:HASH JOIN [INNER JOIN]
+|  |  |  hash predicates: ss_sold_date_sk = d_date_sk
+|  |  |  runtime filters: RF016 <- d_date_sk
+|  |  |  row-size=60B cardinality=589.03K
+|  |  |
+|  |  |--02:SCAN HDFS [tpcds.date_dim]
+|  |  |     HDFS partitions=1/1 files=1 size=9.84MB
+|  |  |     predicates: tpcds.date_dim.d_year = 2002
+|  |  |     row-size=8B cardinality=373
+|  |  |
+|  |  03:HASH JOIN [LEFT OUTER JOIN]
+|  |  |  hash predicates: ss_item_sk = sr_item_sk, ss_ticket_number = 
sr_ticket_number
+|  |  |  other predicates: sr_ticket_number IS NULL
+|  |  |  row-size=52B cardinality=2.88M
+|  |  |
+|  |  |--01:SCAN HDFS [tpcds.store_returns]
+|  |  |     HDFS partitions=1/1 files=1 size=31.19MB
+|  |  |     row-size=16B cardinality=287.51K
+|  |  |
+|  |  00:SCAN HDFS [tpcds.store_sales]
+|  |     HDFS partitions=1824/1824 files=1824 size=346.60MB
+|  |     runtime filters: RF016 -> ss_sold_date_sk
+|  |     row-size=36B cardinality=2.88M
+|  |
+|  11:AGGREGATE [FINALIZE]
+|  |  output: sum(ws_quantity), sum(ws_wholesale_cost), sum(ws_sales_price)
+|  |  group by: d_year, ws_item_sk, ws_bill_customer_sk
+|  |  row-size=56B cardinality=148.00K
+|  |
+|  10:HASH JOIN [INNER JOIN]
+|  |  hash predicates: ws_sold_date_sk = d_date_sk
+|  |  runtime filters: RF014 <- d_date_sk
+|  |  row-size=60B cardinality=148.00K
+|  |
+|  |--08:SCAN HDFS [tpcds.date_dim]
+|  |     HDFS partitions=1/1 files=1 size=9.84MB
+|  |     predicates: tpcds.date_dim.d_year = 2002
+|  |     runtime filters: RF008 -> tpcds.date_dim.d_year
+|  |     row-size=8B cardinality=373
+|  |
+|  09:HASH JOIN [LEFT OUTER JOIN]
+|  |  hash predicates: ws_item_sk = wr_item_sk, ws_order_number = 
wr_order_number
+|  |  other predicates: wr_order_number IS NULL
+|  |  row-size=52B cardinality=719.38K
+|  |
+|  |--07:SCAN HDFS [tpcds.web_returns]
+|  |     HDFS partitions=1/1 files=1 size=9.35MB
+|  |     runtime filters: RF010 -> tpcds.web_returns.wr_item_sk
+|  |     row-size=16B cardinality=71.76K
+|  |
+|  06:SCAN HDFS [tpcds.web_sales]
+|     HDFS partitions=1/1 files=1 size=140.07MB
+|     runtime filters: RF009 -> tpcds.web_sales.ws_bill_customer_sk, RF010 -> 
tpcds.web_sales.ws_item_sk, RF014 -> ws_sold_date_sk
+|     row-size=36B cardinality=719.38K
+|
+17:AGGREGATE [FINALIZE]
+|  output: sum(cs_quantity), sum(cs_wholesale_cost), sum(cs_sales_price)
+|  group by: d_year, cs_item_sk, cs_bill_customer_sk
+|  row-size=56B cardinality=294.63K
+|
+16:HASH JOIN [INNER JOIN]
+|  hash predicates: cs_sold_date_sk = d_date_sk
+|  runtime filters: RF006 <- d_date_sk
+|  row-size=60B cardinality=294.63K
+|
+|--14:SCAN HDFS [tpcds.date_dim]
+|     HDFS partitions=1/1 files=1 size=9.84MB
+|     predicates: tpcds.date_dim.d_year = 2002
+|     runtime filters: RF000 -> tpcds.date_dim.d_year
+|     row-size=8B cardinality=373
+|
+15:HASH JOIN [LEFT OUTER JOIN]
+|  hash predicates: cs_item_sk = cr_item_sk, cs_order_number = cr_order_number
+|  other predicates: cr_order_number IS NULL
+|  row-size=52B cardinality=1.44M
+|
+|--13:SCAN HDFS [tpcds.catalog_returns]
+|     HDFS partitions=1/1 files=1 size=20.39MB
+|     runtime filters: RF002 -> tpcds.catalog_returns.cr_item_sk
+|     row-size=16B cardinality=144.07K
+|
+12:SCAN HDFS [tpcds.catalog_sales]
+   HDFS partitions=1/1 files=1 size=282.20MB
+   runtime filters: RF001 -> tpcds.catalog_sales.cs_bill_customer_sk, RF002 -> 
tpcds.catalog_sales.cs_item_sk, RF006 -> cs_sold_date_sk
+   row-size=36B cardinality=1.44M
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+36:MERGING-EXCHANGE [UNPARTITIONED]
+|  order by: ss_sold_year ASC, ss_item_sk ASC, ss_customer_sk ASC, ss_qty 
DESC, ss_wc DESC, ss_sp DESC, coalesce(ws_qty, 0) + coalesce(cs_qty, 0) ASC, 
coalesce(ws_wc, 0) + coalesce(cs_wc, 0) ASC, coalesce(ws_sp, 0) + 
coalesce(cs_sp, 0) ASC, round(ss_qty / (coalesce(ws_qty + cs_qty, 1)), 2) ASC
+|  limit: 100
+|
+20:TOP-N [LIMIT=100]
+|  order by: ss_sold_year ASC, ss_item_sk ASC, ss_customer_sk ASC, ss_qty 
DESC, ss_wc DESC, ss_sp DESC, coalesce(ws_qty, 0) + coalesce(cs_qty, 0) ASC, 
coalesce(ws_wc, 0) + coalesce(cs_wc, 0) ASC, coalesce(ws_sp, 0) + 
coalesce(cs_sp, 0) ASC, round(ss_qty / (coalesce(ws_qty + cs_qty, 1)), 2) ASC
+|  row-size=104B cardinality=100
+|
+19:HASH JOIN [RIGHT OUTER JOIN, PARTITIONED]
+|  hash predicates: d_year = d_year, cs_bill_customer_sk = ss_customer_sk, 
cs_item_sk = ss_item_sk
+|  other predicates: (coalesce(sum(ws_quantity), 0) > 0 OR 
coalesce(sum(cs_quantity), 0) > 0)
+|  row-size=168B cardinality=3.00K
+|
+|--JOIN BUILD
+|  |  join-table-id=00 plan-id=01 cohort-id=01
+|  |  build expressions: d_year, ss_customer_sk, ss_item_sk
+|  |  runtime filters: RF000 <- d_year, RF001 <- ss_customer_sk, RF002 <- 
ss_item_sk
+|  |
+|  35:EXCHANGE [HASH(d_year,ss_item_sk,ss_customer_sk)]
+|  |
+|  18:HASH JOIN [RIGHT OUTER JOIN, PARTITIONED]
+|  |  hash predicates: d_year = d_year, ws_bill_customer_sk = ss_customer_sk, 
ws_item_sk = ss_item_sk
+|  |  row-size=112B cardinality=3.00K
+|  |
+|  |--JOIN BUILD
+|  |  |  join-table-id=01 plan-id=02 cohort-id=02
+|  |  |  build expressions: d_year, ss_customer_sk, ss_item_sk
+|  |  |  runtime filters: RF008 <- d_year, RF009 <- ss_customer_sk, RF010 <- 
ss_item_sk
+|  |  |
+|  |  34:EXCHANGE [HASH(d_year,ss_item_sk,ss_customer_sk)]
+|  |  |
+|  |  33:AGGREGATE [FINALIZE]
+|  |  |  output: sum:merge(ss_quantity), sum:merge(ss_wholesale_cost), 
sum:merge(ss_sales_price)
+|  |  |  group by: d_year, ss_item_sk, ss_customer_sk
+|  |  |  having: d_year = 2002
+|  |  |  row-size=56B cardinality=3.00K
+|  |  |
+|  |  32:EXCHANGE [HASH(d_year,ss_item_sk,ss_customer_sk)]
+|  |  |
+|  |  05:AGGREGATE [STREAMING]
+|  |  |  output: sum(ss_quantity), sum(ss_wholesale_cost), sum(ss_sales_price)
+|  |  |  group by: d_year, ss_item_sk, ss_customer_sk
+|  |  |  row-size=56B cardinality=589.03K
+|  |  |
+|  |  04:HASH JOIN [INNER JOIN, BROADCAST]
+|  |  |  hash predicates: ss_sold_date_sk = d_date_sk
+|  |  |  row-size=60B cardinality=589.03K
+|  |  |
+|  |  |--JOIN BUILD
+|  |  |  |  join-table-id=02 plan-id=03 cohort-id=03
+|  |  |  |  build expressions: d_date_sk
+|  |  |  |  runtime filters: RF016 <- d_date_sk
+|  |  |  |
+|  |  |  31:EXCHANGE [BROADCAST]
+|  |  |  |
+|  |  |  02:SCAN HDFS [tpcds.date_dim]
+|  |  |     HDFS partitions=1/1 files=1 size=9.84MB
+|  |  |     predicates: tpcds.date_dim.d_year = 2002
+|  |  |     row-size=8B cardinality=373
+|  |  |
+|  |  03:HASH JOIN [LEFT OUTER JOIN, PARTITIONED]
+|  |  |  hash predicates: ss_item_sk = sr_item_sk, ss_ticket_number = 
sr_ticket_number
+|  |  |  other predicates: sr_ticket_number IS NULL
+|  |  |  row-size=52B cardinality=2.88M
+|  |  |
+|  |  |--JOIN BUILD
+|  |  |  |  join-table-id=03 plan-id=04 cohort-id=03
+|  |  |  |  build expressions: sr_item_sk, sr_ticket_number
+|  |  |  |
+|  |  |  30:EXCHANGE [HASH(sr_item_sk,sr_ticket_number)]
+|  |  |  |
+|  |  |  01:SCAN HDFS [tpcds.store_returns]
+|  |  |     HDFS partitions=1/1 files=1 size=31.19MB
+|  |  |     row-size=16B cardinality=287.51K
+|  |  |
+|  |  29:EXCHANGE [HASH(ss_item_sk,ss_ticket_number)]
+|  |  |
+|  |  00:SCAN HDFS [tpcds.store_sales]
+|  |     HDFS partitions=1824/1824 files=1824 size=346.60MB
+|  |     runtime filters: RF016 -> ss_sold_date_sk
+|  |     row-size=36B cardinality=2.88M
+|  |
+|  28:AGGREGATE [FINALIZE]
+|  |  output: sum:merge(ws_quantity), sum:merge(ws_wholesale_cost), 
sum:merge(ws_sales_price)
+|  |  group by: d_year, ws_item_sk, ws_bill_customer_sk
+|  |  row-size=56B cardinality=148.00K
+|  |
+|  27:EXCHANGE [HASH(d_year,ws_item_sk,ws_bill_customer_sk)]
+|  |
+|  11:AGGREGATE [STREAMING]
+|  |  output: sum(ws_quantity), sum(ws_wholesale_cost), sum(ws_sales_price)
+|  |  group by: d_year, ws_item_sk, ws_bill_customer_sk
+|  |  row-size=56B cardinality=148.00K
+|  |
+|  10:HASH JOIN [INNER JOIN, BROADCAST]
+|  |  hash predicates: ws_sold_date_sk = d_date_sk
+|  |  row-size=60B cardinality=148.00K
+|  |
+|  |--JOIN BUILD
+|  |  |  join-table-id=04 plan-id=05 cohort-id=02
+|  |  |  build expressions: d_date_sk
+|  |  |  runtime filters: RF014 <- d_date_sk
+|  |  |
+|  |  26:EXCHANGE [BROADCAST]
+|  |  |
+|  |  08:SCAN HDFS [tpcds.date_dim]
+|  |     HDFS partitions=1/1 files=1 size=9.84MB
+|  |     predicates: tpcds.date_dim.d_year = 2002
+|  |     runtime filters: RF008 -> tpcds.date_dim.d_year
+|  |     row-size=8B cardinality=373
+|  |
+|  09:HASH JOIN [LEFT OUTER JOIN, BROADCAST]
+|  |  hash predicates: ws_item_sk = wr_item_sk, ws_order_number = 
wr_order_number
+|  |  other predicates: wr_order_number IS NULL
+|  |  row-size=52B cardinality=719.38K
+|  |
+|  |--JOIN BUILD
+|  |  |  join-table-id=05 plan-id=06 cohort-id=02
+|  |  |  build expressions: wr_item_sk, wr_order_number
+|  |  |
+|  |  25:EXCHANGE [BROADCAST]
+|  |  |
+|  |  07:SCAN HDFS [tpcds.web_returns]
+|  |     HDFS partitions=1/1 files=1 size=9.35MB
+|  |     runtime filters: RF010 -> tpcds.web_returns.wr_item_sk
+|  |     row-size=16B cardinality=71.76K
+|  |
+|  06:SCAN HDFS [tpcds.web_sales]
+|     HDFS partitions=1/1 files=1 size=140.07MB
+|     runtime filters: RF009 -> tpcds.web_sales.ws_bill_customer_sk, RF010 -> 
tpcds.web_sales.ws_item_sk, RF014 -> ws_sold_date_sk
+|     row-size=36B cardinality=719.38K
+|
+24:AGGREGATE [FINALIZE]
+|  output: sum:merge(cs_quantity), sum:merge(cs_wholesale_cost), 
sum:merge(cs_sales_price)
+|  group by: d_year, cs_item_sk, cs_bill_customer_sk
+|  row-size=56B cardinality=294.63K
+|
+23:EXCHANGE [HASH(d_year,cs_item_sk,cs_bill_customer_sk)]
+|
+17:AGGREGATE [STREAMING]
+|  output: sum(cs_quantity), sum(cs_wholesale_cost), sum(cs_sales_price)
+|  group by: d_year, cs_item_sk, cs_bill_customer_sk
+|  row-size=56B cardinality=294.63K
+|
+16:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: cs_sold_date_sk = d_date_sk
+|  row-size=60B cardinality=294.63K
+|
+|--JOIN BUILD
+|  |  join-table-id=06 plan-id=07 cohort-id=01
+|  |  build expressions: d_date_sk
+|  |  runtime filters: RF006 <- d_date_sk
+|  |
+|  22:EXCHANGE [BROADCAST]
+|  |
+|  14:SCAN HDFS [tpcds.date_dim]
+|     HDFS partitions=1/1 files=1 size=9.84MB
+|     predicates: tpcds.date_dim.d_year = 2002
+|     runtime filters: RF000 -> tpcds.date_dim.d_year
+|     row-size=8B cardinality=373
+|
+15:HASH JOIN [LEFT OUTER JOIN, BROADCAST]
+|  hash predicates: cs_item_sk = cr_item_sk, cs_order_number = cr_order_number
+|  other predicates: cr_order_number IS NULL
+|  row-size=52B cardinality=1.44M
+|
+|--JOIN BUILD
+|  |  join-table-id=07 plan-id=08 cohort-id=01
+|  |  build expressions: cr_item_sk, cr_order_number
+|  |
+|  21:EXCHANGE [BROADCAST]
+|  |
+|  13:SCAN HDFS [tpcds.catalog_returns]
+|     HDFS partitions=1/1 files=1 size=20.39MB
+|     runtime filters: RF002 -> tpcds.catalog_returns.cr_item_sk
+|     row-size=16B cardinality=144.07K
+|
+12:SCAN HDFS [tpcds.catalog_sales]
+   HDFS partitions=1/1 files=1 size=282.20MB
+   runtime filters: RF001 -> tpcds.catalog_sales.cs_bill_customer_sk, RF002 -> 
tpcds.catalog_sales.cs_item_sk, RF006 -> cs_sold_date_sk
+   row-size=36B cardinality=1.44M
+---- PARALLELPLANS
+PLAN-ROOT SINK
+|
+36:MERGING-EXCHANGE [UNPARTITIONED]
+|  order by: ss_sold_year ASC, ss_item_sk ASC, ss_customer_sk ASC, ss_qty 
DESC, ss_wc DESC, ss_sp DESC, coalesce(ws_qty, 0) + coalesce(cs_qty, 0) ASC, 
coalesce(ws_wc, 0) + coalesce(cs_wc, 0) ASC, coalesce(ws_sp, 0) + 
coalesce(cs_sp, 0) ASC, round(ss_qty / (coalesce(ws_qty + cs_qty, 1)), 2) ASC
+|  limit: 100
+|
+20:TOP-N [LIMIT=100]
+|  order by: ss_sold_year ASC, ss_item_sk ASC, ss_customer_sk ASC, ss_qty 
DESC, ss_wc DESC, ss_sp DESC, coalesce(ws_qty, 0) + coalesce(cs_qty, 0) ASC, 
coalesce(ws_wc, 0) + coalesce(cs_wc, 0) ASC, coalesce(ws_sp, 0) + 
coalesce(cs_sp, 0) ASC, round(ss_qty / (coalesce(ws_qty + cs_qty, 1)), 2) ASC
+|  row-size=104B cardinality=100
+|
+19:HASH JOIN [RIGHT OUTER JOIN, PARTITIONED]
+|  hash predicates: d_year = d_year, cs_bill_customer_sk = ss_customer_sk, 
cs_item_sk = ss_item_sk
+|  other predicates: (coalesce(sum(ws_quantity), 0) > 0 OR 
coalesce(sum(cs_quantity), 0) > 0)
+|  row-size=168B cardinality=3.00K
+|
+|--JOIN BUILD
+|  |  join-table-id=00 plan-id=01 cohort-id=01
+|  |  build expressions: d_year, ss_customer_sk, ss_item_sk
+|  |  runtime filters: RF000 <- d_year, RF001 <- ss_customer_sk, RF002 <- 
ss_item_sk
+|  |
+|  35:EXCHANGE [HASH(d_year,ss_item_sk,ss_customer_sk)]
+|  |
+|  18:HASH JOIN [RIGHT OUTER JOIN, PARTITIONED]
+|  |  hash predicates: d_year = d_year, ws_bill_customer_sk = ss_customer_sk, 
ws_item_sk = ss_item_sk
+|  |  row-size=112B cardinality=3.00K
+|  |
+|  |--JOIN BUILD
+|  |  |  join-table-id=01 plan-id=02 cohort-id=02
+|  |  |  build expressions: d_year, ss_customer_sk, ss_item_sk
+|  |  |  runtime filters: RF008 <- d_year, RF009 <- ss_customer_sk, RF010 <- 
ss_item_sk
+|  |  |
+|  |  34:EXCHANGE [HASH(d_year,ss_item_sk,ss_customer_sk)]
+|  |  |
+|  |  33:AGGREGATE [FINALIZE]
+|  |  |  output: sum:merge(ss_quantity), sum:merge(ss_wholesale_cost), 
sum:merge(ss_sales_price)
+|  |  |  group by: d_year, ss_item_sk, ss_customer_sk
+|  |  |  having: d_year = 2002
+|  |  |  row-size=56B cardinality=3.00K
+|  |  |
+|  |  32:EXCHANGE [HASH(d_year,ss_item_sk,ss_customer_sk)]
+|  |  |
+|  |  05:AGGREGATE [STREAMING]
+|  |  |  output: sum(ss_quantity), sum(ss_wholesale_cost), sum(ss_sales_price)
+|  |  |  group by: d_year, ss_item_sk, ss_customer_sk
+|  |  |  row-size=56B cardinality=589.03K
+|  |  |
+|  |  04:HASH JOIN [INNER JOIN, BROADCAST]
+|  |  |  hash predicates: ss_sold_date_sk = d_date_sk
+|  |  |  row-size=60B cardinality=589.03K
+|  |  |
+|  |  |--JOIN BUILD
+|  |  |  |  join-table-id=02 plan-id=03 cohort-id=03
+|  |  |  |  build expressions: d_date_sk
+|  |  |  |  runtime filters: RF016 <- d_date_sk
+|  |  |  |
+|  |  |  31:EXCHANGE [BROADCAST]
+|  |  |  |
+|  |  |  02:SCAN HDFS [tpcds.date_dim]
+|  |  |     HDFS partitions=1/1 files=1 size=9.84MB
+|  |  |     predicates: tpcds.date_dim.d_year = 2002
+|  |  |     row-size=8B cardinality=373
+|  |  |
+|  |  03:HASH JOIN [LEFT OUTER JOIN, PARTITIONED]
+|  |  |  hash predicates: ss_item_sk = sr_item_sk, ss_ticket_number = 
sr_ticket_number
+|  |  |  other predicates: sr_ticket_number IS NULL
+|  |  |  row-size=52B cardinality=2.88M
+|  |  |
+|  |  |--JOIN BUILD
+|  |  |  |  join-table-id=03 plan-id=04 cohort-id=03
+|  |  |  |  build expressions: sr_item_sk, sr_ticket_number
+|  |  |  |
+|  |  |  30:EXCHANGE [HASH(sr_item_sk,sr_ticket_number)]
+|  |  |  |
+|  |  |  01:SCAN HDFS [tpcds.store_returns]
+|  |  |     HDFS partitions=1/1 files=1 size=31.19MB
+|  |  |     row-size=16B cardinality=287.51K
+|  |  |
+|  |  29:EXCHANGE [HASH(ss_item_sk,ss_ticket_number)]
+|  |  |
+|  |  00:SCAN HDFS [tpcds.store_sales]
+|  |     HDFS partitions=1824/1824 files=1824 size=346.60MB
+|  |     runtime filters: RF016 -> ss_sold_date_sk
+|  |     row-size=36B cardinality=2.88M
+|  |
+|  28:AGGREGATE [FINALIZE]
+|  |  output: sum:merge(ws_quantity), sum:merge(ws_wholesale_cost), 
sum:merge(ws_sales_price)
+|  |  group by: d_year, ws_item_sk, ws_bill_customer_sk
+|  |  row-size=56B cardinality=148.00K
+|  |
+|  27:EXCHANGE [HASH(d_year,ws_item_sk,ws_bill_customer_sk)]
+|  |
+|  11:AGGREGATE [STREAMING]
+|  |  output: sum(ws_quantity), sum(ws_wholesale_cost), sum(ws_sales_price)
+|  |  group by: d_year, ws_item_sk, ws_bill_customer_sk
+|  |  row-size=56B cardinality=148.00K
+|  |
+|  10:HASH JOIN [INNER JOIN, BROADCAST]
+|  |  hash predicates: ws_sold_date_sk = d_date_sk
+|  |  row-size=60B cardinality=148.00K
+|  |
+|  |--JOIN BUILD
+|  |  |  join-table-id=04 plan-id=05 cohort-id=02
+|  |  |  build expressions: d_date_sk
+|  |  |  runtime filters: RF014 <- d_date_sk
+|  |  |
+|  |  26:EXCHANGE [BROADCAST]
+|  |  |
+|  |  08:SCAN HDFS [tpcds.date_dim]
+|  |     HDFS partitions=1/1 files=1 size=9.84MB
+|  |     predicates: tpcds.date_dim.d_year = 2002
+|  |     runtime filters: RF008 -> tpcds.date_dim.d_year
+|  |     row-size=8B cardinality=373
+|  |
+|  09:HASH JOIN [LEFT OUTER JOIN, BROADCAST]
+|  |  hash predicates: ws_item_sk = wr_item_sk, ws_order_number = 
wr_order_number
+|  |  other predicates: wr_order_number IS NULL
+|  |  row-size=52B cardinality=719.38K
+|  |
+|  |--JOIN BUILD
+|  |  |  join-table-id=05 plan-id=06 cohort-id=02
+|  |  |  build expressions: wr_item_sk, wr_order_number
+|  |  |
+|  |  25:EXCHANGE [BROADCAST]
+|  |  |
+|  |  07:SCAN HDFS [tpcds.web_returns]
+|  |     HDFS partitions=1/1 files=1 size=9.35MB
+|  |     runtime filters: RF010 -> tpcds.web_returns.wr_item_sk
+|  |     row-size=16B cardinality=71.76K
+|  |
+|  06:SCAN HDFS [tpcds.web_sales]
+|     HDFS partitions=1/1 files=1 size=140.07MB
+|     runtime filters: RF009 -> tpcds.web_sales.ws_bill_customer_sk, RF010 -> 
tpcds.web_sales.ws_item_sk, RF014 -> ws_sold_date_sk
+|     row-size=36B cardinality=719.38K
+|
+24:AGGREGATE [FINALIZE]
+|  output: sum:merge(cs_quantity), sum:merge(cs_wholesale_cost), 
sum:merge(cs_sales_price)
+|  group by: d_year, cs_item_sk, cs_bill_customer_sk
+|  row-size=56B cardinality=294.63K
+|
+23:EXCHANGE [HASH(d_year,cs_item_sk,cs_bill_customer_sk)]
+|
+17:AGGREGATE [STREAMING]
+|  output: sum(cs_quantity), sum(cs_wholesale_cost), sum(cs_sales_price)
+|  group by: d_year, cs_item_sk, cs_bill_customer_sk
+|  row-size=56B cardinality=294.63K
+|
+16:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: cs_sold_date_sk = d_date_sk
+|  row-size=60B cardinality=294.63K
+|
+|--JOIN BUILD
+|  |  join-table-id=06 plan-id=07 cohort-id=01
+|  |  build expressions: d_date_sk
+|  |  runtime filters: RF006 <- d_date_sk
+|  |
+|  22:EXCHANGE [BROADCAST]
+|  |
+|  14:SCAN HDFS [tpcds.date_dim]
+|     HDFS partitions=1/1 files=1 size=9.84MB
+|     predicates: tpcds.date_dim.d_year = 2002
+|     runtime filters: RF000 -> tpcds.date_dim.d_year
+|     row-size=8B cardinality=373
+|
+15:HASH JOIN [LEFT OUTER JOIN, BROADCAST]
+|  hash predicates: cs_item_sk = cr_item_sk, cs_order_number = cr_order_number
+|  other predicates: cr_order_number IS NULL
+|  row-size=52B cardinality=1.44M
+|
+|--JOIN BUILD
+|  |  join-table-id=07 plan-id=08 cohort-id=01
+|  |  build expressions: cr_item_sk, cr_order_number
+|  |
+|  21:EXCHANGE [BROADCAST]
+|  |
+|  13:SCAN HDFS [tpcds.catalog_returns]
+|     HDFS partitions=1/1 files=1 size=20.39MB
+|     runtime filters: RF002 -> tpcds.catalog_returns.cr_item_sk
+|     row-size=16B cardinality=144.07K
+|
+12:SCAN HDFS [tpcds.catalog_sales]
+   HDFS partitions=1/1 files=1 size=282.20MB
+   runtime filters: RF001 -> tpcds.catalog_sales.cs_bill_customer_sk, RF002 -> 
tpcds.catalog_sales.cs_item_sk, RF006 -> cs_sold_date_sk
+   row-size=36B cardinality=1.44M
+====

Reply via email to