Hey folks,

I've been digging into a couple of TPCDS queries that are unexpectedly slow
and uncovered what seems to be some surprising behavior in the planner
concerning runtime filter assignment. Consider the following query in the
TPCDS schema:

select straight_join *
  from promotion a
  join item b on a.p_item_sk = b.i_item_sk
  join [shuffle] store_sales c on b.i_item_sk = c.ss_item_sk
  where b.i_color = 'xxx'

This generates a plan that looks like this:
http://people.apache.org/~todd/plan.png

Or in text form:
+-----------------------------------------------------------------------------------------------------+
| Explain String
                          |
+-----------------------------------------------------------------------------------------------------+
| Max Per-Host Resource Reservation: Memory=85.88MB
                           |
| Per-Host Resource Estimates: Memory=298.14GB
                          |
|
                           |
| F04:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
                           |
| |  Per-Host Resources: mem-estimate=0B mem-reservation=0B
                           |
| PLAN-ROOT SINK
                          |
| |  mem-estimate=0B mem-reservation=0B
                           |
| |
                           |
| 08:EXCHANGE [UNPARTITIONED]
                           |
| |  mem-estimate=0B mem-reservation=0B
                           |
| |  tuple-ids=0,1,2 row-size=911B cardinality=14201171
                           |
| |
                           |
| F03:PLAN FRAGMENT [HASH(b.i_item_sk)] hosts=1 instances=1
                           |
| Per-Host Resources: mem-estimate=295.06GB mem-reservation=50.00MB
runtime-filters-memory=16.00MB    |
| 04:HASH JOIN [INNER JOIN, PARTITIONED]
                          |
| |  hash predicates: b.i_item_sk = c.ss_item_sk
                          |
| |  fk/pk conjuncts: none
                          |
| |  runtime filters: RF000[bloom] <- c.ss_item_sk
                          |
| |  mem-estimate=295.04GB mem-reservation=34.00MB spill-buffer=2.00MB
                          |
| |  tuple-ids=0,1,2 row-size=911B cardinality=14201171
                           |
| |
                           |
| |--07:EXCHANGE [HASH(c.ss_item_sk)]
                           |
| |  |  mem-estimate=0B mem-reservation=0B
                          |
| |  |  tuple-ids=2 row-size=100B cardinality=2879987999
                          |
| |  |
                          |
| |  F02:PLAN FRAGMENT [RANDOM] hosts=9 instances=9
                           |
| |  Per-Host Resources: mem-estimate=1.89GB mem-reservation=0B
                           |
| |  02:SCAN HDFS [tpcds_1000_parquet.store_sales c, RANDOM]
                          |
| |     partitions=1824/1824 files=1824 size=189.24GB
                           |
| |     stored statistics:
                          |
| |       table: rows=2879987999 size=unavailable
                           |
| |       partitions: 1824/1824 rows=2879987999
                           |
| |       columns: all
                          |
| |     extrapolated-rows=disabled
                          |
| |     mem-estimate=1.89GB mem-reservation=0B
                          |
| |     tuple-ids=2 row-size=100B cardinality=2879987999
                          |
| |
                           |
| 06:EXCHANGE [HASH(b.i_item_sk)]
                           |
| |  mem-estimate=0B mem-reservation=0B
                           |
| |  tuple-ids=0,1 row-size=811B cardinality=1500
                           |
| |
                           |
| F00:PLAN FRAGMENT [RANDOM] hosts=1 instances=1
                          |
| Per-Host Resources: mem-estimate=323.88MB mem-reservation=19.88MB
runtime-filters-memory=17.00MB    |
| 03:HASH JOIN [INNER JOIN, BROADCAST]
                          |
| |  hash predicates: a.p_item_sk = b.i_item_sk
                           |
| |  fk/pk conjuncts: a.p_item_sk = b.i_item_sk
                           |
| |  runtime filters: RF002[bloom] <- b.i_item_sk
                           |
| |  mem-estimate=2.88MB mem-reservation=2.88MB spill-buffer=128.00KB
                           |
| |  tuple-ids=0,1 row-size=811B cardinality=1500
                           |
| |
                           |
| |--05:EXCHANGE [BROADCAST]
                          |
| |  |  mem-estimate=0B mem-reservation=0B
                          |
| |  |  tuple-ids=1 row-size=496B cardinality=3226
                          |
| |  |
                          |
| |  F01:PLAN FRAGMENT [RANDOM] hosts=1 instances=1
                           |
| |  Per-Host Resources: mem-estimate=896.00MB mem-reservation=16.00MB
runtime-filters-memory=16.00MB |
| |  01:SCAN HDFS [tpcds_1000_parquet.item b, RANDOM]
                           |
| |     partitions=1/1 files=1 size=28.28MB
                           |
| |     predicates: b.i_color = 'xxx'
                           |
| |     runtime filters: RF000[bloom] -> b.i_item_sk
                          |
| |     stored statistics:
                          |
| |       table: rows=300000 size=28.28MB
                           |
| |       columns: all
                          |
| |     extrapolated-rows=disabled
                          |
| |     parquet statistics predicates: b.i_color = 'xxx'
                          |
| |     parquet dictionary predicates: b.i_color = 'xxx'
                          |
| |     mem-estimate=880.00MB mem-reservation=0B
                          |
| |     tuple-ids=1 row-size=496B cardinality=3226
                          |
| |
                           |
| 00:SCAN HDFS [tpcds_1000_parquet.promotion a, RANDOM]
                           |
|    partitions=1/1 files=1 size=85.50KB
                          |
|    runtime filters: RF000[bloom] -> a.p_item_sk, RF002[bloom] ->
a.p_item_sk                        |
|    stored statistics:
                           |
|      table: rows=1500 size=85.50KB
                          |
|      columns: all
                           |
|    extrapolated-rows=disabled
                           |
|    mem-estimate=304.00MB mem-reservation=0B
                           |
|    tuple-ids=0 row-size=315B cardinality=1500
                           |
+-----------------------------------------------------------------------------------------------------+


Here because of the equi-joins, we have a slot equivalence for all of the
'item_id' columns. So, it seems it would be valid to take the runtime
filter RF002 (generated from b.sk_item_id at HASH JOIN node 03) and apply
it at the scan node 02 for store_sales. Notably, doing so is extremely
beneficial in this manufactured query, since the scan of 'item' takes only
a second or so to determine that in fact there are no items with color
'xxx'. This could completely short circuit the scan 02 of the very large
store_sales table.

Obviously this query is canned, but I'm seeing some cases in TPCDS (eg Q64)
where a much more complex variant of this plan ends up being generated and
failing to utilize an available runtime filter.

I spent some time digging in the code, and I think the faulty logic might
be here in RuntimeFilterGenerator.java:

      generateFilters(ctx, root.getChild(0));
      // Finalize every runtime filter of that join. This is to ensure that
we don't
      // assign a filter to a scan node from the right subtree of joinNode
or ancestor
      // join nodes in case we don't find a destination node in the left
subtree.
      for (RuntimeFilter runtimeFilter: filters)
finalizeRuntimeFilter(runtimeFilter);

I'm not quite sure how to understand the comment, but it seems this is
preventing the runtime filter at the hash join from being sent to any node
which is either on its right descendent or higher up the tree. In other
words, it's limiting the runtime filter to _only_ be applied at nodes in
its left subtree.

Per my above example query, I think this is unnecessarily restrictive.
Based on my understanding of Impala scheduling/parallelism, all of the
build sides of joins start running in parallel from bottom up, and it's
always possible for a build to complete somewhere deep in the tree before a
build completes higher up the tree, in which case sending the RF to the
hash join's "cousin" is beneficial. I think the only necessary restriction
is that a RF should not be sent from a hash join node to any descendent of
its right child.

Keep in mind I'm very new to the Impala planner code and particularly to
the runtime filter portion thereof, so I may have missed something. But
does the above sound like a plausible bug/missed optimization?

-Todd
-- 
Todd Lipcon
Software Engineer, Cloudera

Reply via email to