(If folks are interested in a refresher, https://impala.apache.org/docs/build/html/topics/impala_runtime_filtering.html is useful.)
I stared at that code for a bit, and I agree with you that it's plausible. I'm also confused by the "bottom-up" comment of generateFilters(): it seems like we walk the plan depth first, and the assignment happens in that depth-first walk, before all the runtime filters are generated. A concern is making sure that there are no outer joins impacting correctness. -- Philip On Wed, Jan 30, 2019 at 9:36 PM Todd Lipcon <t...@cloudera.com> wrote: > Hey folks, > > I've been digging into a couple of TPCDS queries that are unexpectedly slow > and uncovered what seems to be some surprising behavior in the planner > concerning runtime filter assignment. Consider the following query in the > TPCDS schema: > > select straight_join * > from promotion a > join item b on a.p_item_sk = b.i_item_sk > join [shuffle] store_sales c on b.i_item_sk = c.ss_item_sk > where b.i_color = 'xxx' > > This generates a plan that looks like this: > http://people.apache.org/~todd/plan.png > > Or in text form: > > +-----------------------------------------------------------------------------------------------------+ > | Explain String > | > > +-----------------------------------------------------------------------------------------------------+ > | Max Per-Host Resource Reservation: Memory=85.88MB > | > | Per-Host Resource Estimates: Memory=298.14GB > | > | > | > | F04:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 > | > | | Per-Host Resources: mem-estimate=0B mem-reservation=0B > | > | PLAN-ROOT SINK > | > | | mem-estimate=0B mem-reservation=0B > | > | | > | > | 08:EXCHANGE [UNPARTITIONED] > | > | | mem-estimate=0B mem-reservation=0B > | > | | tuple-ids=0,1,2 row-size=911B cardinality=14201171 > | > | | > | > | F03:PLAN FRAGMENT [HASH(b.i_item_sk)] hosts=1 instances=1 > | > | Per-Host Resources: mem-estimate=295.06GB mem-reservation=50.00MB > runtime-filters-memory=16.00MB | > | 04:HASH JOIN [INNER JOIN, PARTITIONED] > | > | | hash predicates: b.i_item_sk = c.ss_item_sk > | > | | fk/pk conjuncts: none > | > | | runtime filters: RF000[bloom] <- c.ss_item_sk > | > | | mem-estimate=295.04GB mem-reservation=34.00MB spill-buffer=2.00MB > | > | | tuple-ids=0,1,2 row-size=911B cardinality=14201171 > | > | | > | > | |--07:EXCHANGE [HASH(c.ss_item_sk)] > | > | | | mem-estimate=0B mem-reservation=0B > | > | | | tuple-ids=2 row-size=100B cardinality=2879987999 > | > | | | > | > | | F02:PLAN FRAGMENT [RANDOM] hosts=9 instances=9 > | > | | Per-Host Resources: mem-estimate=1.89GB mem-reservation=0B > | > | | 02:SCAN HDFS [tpcds_1000_parquet.store_sales c, RANDOM] > | > | | partitions=1824/1824 files=1824 size=189.24GB > | > | | stored statistics: > | > | | table: rows=2879987999 size=unavailable > | > | | partitions: 1824/1824 rows=2879987999 > | > | | columns: all > | > | | extrapolated-rows=disabled > | > | | mem-estimate=1.89GB mem-reservation=0B > | > | | tuple-ids=2 row-size=100B cardinality=2879987999 > | > | | > | > | 06:EXCHANGE [HASH(b.i_item_sk)] > | > | | mem-estimate=0B mem-reservation=0B > | > | | tuple-ids=0,1 row-size=811B cardinality=1500 > | > | | > | > | F00:PLAN FRAGMENT [RANDOM] hosts=1 instances=1 > | > | Per-Host Resources: mem-estimate=323.88MB mem-reservation=19.88MB > runtime-filters-memory=17.00MB | > | 03:HASH JOIN [INNER JOIN, BROADCAST] > | > | | hash predicates: a.p_item_sk = b.i_item_sk > | > | | fk/pk conjuncts: a.p_item_sk = b.i_item_sk > | > | | runtime filters: RF002[bloom] <- b.i_item_sk > | > | | mem-estimate=2.88MB mem-reservation=2.88MB spill-buffer=128.00KB > | > | | tuple-ids=0,1 row-size=811B cardinality=1500 > | > | | > | > | |--05:EXCHANGE [BROADCAST] > | > | | | mem-estimate=0B mem-reservation=0B > | > | | | tuple-ids=1 row-size=496B cardinality=3226 > | > | | | > | > | | F01:PLAN FRAGMENT [RANDOM] hosts=1 instances=1 > | > | | Per-Host Resources: mem-estimate=896.00MB mem-reservation=16.00MB > runtime-filters-memory=16.00MB | > | | 01:SCAN HDFS [tpcds_1000_parquet.item b, RANDOM] > | > | | partitions=1/1 files=1 size=28.28MB > | > | | predicates: b.i_color = 'xxx' > | > | | runtime filters: RF000[bloom] -> b.i_item_sk > | > | | stored statistics: > | > | | table: rows=300000 size=28.28MB > | > | | columns: all > | > | | extrapolated-rows=disabled > | > | | parquet statistics predicates: b.i_color = 'xxx' > | > | | parquet dictionary predicates: b.i_color = 'xxx' > | > | | mem-estimate=880.00MB mem-reservation=0B > | > | | tuple-ids=1 row-size=496B cardinality=3226 > | > | | > | > | 00:SCAN HDFS [tpcds_1000_parquet.promotion a, RANDOM] > | > | partitions=1/1 files=1 size=85.50KB > | > | runtime filters: RF000[bloom] -> a.p_item_sk, RF002[bloom] -> > a.p_item_sk | > | stored statistics: > | > | table: rows=1500 size=85.50KB > | > | columns: all > | > | extrapolated-rows=disabled > | > | mem-estimate=304.00MB mem-reservation=0B > | > | tuple-ids=0 row-size=315B cardinality=1500 > | > > +-----------------------------------------------------------------------------------------------------+ > > > Here because of the equi-joins, we have a slot equivalence for all of the > 'item_id' columns. So, it seems it would be valid to take the runtime > filter RF002 (generated from b.sk_item_id at HASH JOIN node 03) and apply > it at the scan node 02 for store_sales. Notably, doing so is extremely > beneficial in this manufactured query, since the scan of 'item' takes only > a second or so to determine that in fact there are no items with color > 'xxx'. This could completely short circuit the scan 02 of the very large > store_sales table. > > Obviously this query is canned, but I'm seeing some cases in TPCDS (eg Q64) > where a much more complex variant of this plan ends up being generated and > failing to utilize an available runtime filter. > > I spent some time digging in the code, and I think the faulty logic might > be here in RuntimeFilterGenerator.java: > > generateFilters(ctx, root.getChild(0)); > // Finalize every runtime filter of that join. This is to ensure that > we don't > // assign a filter to a scan node from the right subtree of joinNode > or ancestor > // join nodes in case we don't find a destination node in the left > subtree. > for (RuntimeFilter runtimeFilter: filters) > finalizeRuntimeFilter(runtimeFilter); > > I'm not quite sure how to understand the comment, but it seems this is > preventing the runtime filter at the hash join from being sent to any node > which is either on its right descendent or higher up the tree. In other > words, it's limiting the runtime filter to _only_ be applied at nodes in > its left subtree. > > Per my above example query, I think this is unnecessarily restrictive. > Based on my understanding of Impala scheduling/parallelism, all of the > build sides of joins start running in parallel from bottom up, and it's > always possible for a build to complete somewhere deep in the tree before a > build completes higher up the tree, in which case sending the RF to the > hash join's "cousin" is beneficial. I think the only necessary restriction > is that a RF should not be sent from a hash join node to any descendent of > its right child. > > Keep in mind I'm very new to the Impala planner code and particularly to > the runtime filter portion thereof, so I may have missed something. But > does the above sound like a plausible bug/missed optimization? > > -Todd > -- > Todd Lipcon > Software Engineer, Cloudera >