(If folks are interested in a refresher,
https://impala.apache.org/docs/build/html/topics/impala_runtime_filtering.html
is useful.)

I stared at that code for a bit, and I agree with you that it's plausible.
I'm also confused by the "bottom-up" comment of generateFilters(): it seems
like we walk the plan depth first, and the assignment happens in that
depth-first walk, before all the runtime filters are generated.

A concern is making sure that there are no outer joins impacting
correctness.

-- Philip

On Wed, Jan 30, 2019 at 9:36 PM Todd Lipcon <t...@cloudera.com> wrote:

> Hey folks,
>
> I've been digging into a couple of TPCDS queries that are unexpectedly slow
> and uncovered what seems to be some surprising behavior in the planner
> concerning runtime filter assignment. Consider the following query in the
> TPCDS schema:
>
> select straight_join *
>   from promotion a
>   join item b on a.p_item_sk = b.i_item_sk
>   join [shuffle] store_sales c on b.i_item_sk = c.ss_item_sk
>   where b.i_color = 'xxx'
>
> This generates a plan that looks like this:
> http://people.apache.org/~todd/plan.png
>
> Or in text form:
>
> +-----------------------------------------------------------------------------------------------------+
> | Explain String
>                           |
>
> +-----------------------------------------------------------------------------------------------------+
> | Max Per-Host Resource Reservation: Memory=85.88MB
>                            |
> | Per-Host Resource Estimates: Memory=298.14GB
>                           |
> |
>                            |
> | F04:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
>                            |
> | |  Per-Host Resources: mem-estimate=0B mem-reservation=0B
>                            |
> | PLAN-ROOT SINK
>                           |
> | |  mem-estimate=0B mem-reservation=0B
>                            |
> | |
>                            |
> | 08:EXCHANGE [UNPARTITIONED]
>                            |
> | |  mem-estimate=0B mem-reservation=0B
>                            |
> | |  tuple-ids=0,1,2 row-size=911B cardinality=14201171
>                            |
> | |
>                            |
> | F03:PLAN FRAGMENT [HASH(b.i_item_sk)] hosts=1 instances=1
>                            |
> | Per-Host Resources: mem-estimate=295.06GB mem-reservation=50.00MB
> runtime-filters-memory=16.00MB    |
> | 04:HASH JOIN [INNER JOIN, PARTITIONED]
>                           |
> | |  hash predicates: b.i_item_sk = c.ss_item_sk
>                           |
> | |  fk/pk conjuncts: none
>                           |
> | |  runtime filters: RF000[bloom] <- c.ss_item_sk
>                           |
> | |  mem-estimate=295.04GB mem-reservation=34.00MB spill-buffer=2.00MB
>                           |
> | |  tuple-ids=0,1,2 row-size=911B cardinality=14201171
>                            |
> | |
>                            |
> | |--07:EXCHANGE [HASH(c.ss_item_sk)]
>                            |
> | |  |  mem-estimate=0B mem-reservation=0B
>                           |
> | |  |  tuple-ids=2 row-size=100B cardinality=2879987999
>                           |
> | |  |
>                           |
> | |  F02:PLAN FRAGMENT [RANDOM] hosts=9 instances=9
>                            |
> | |  Per-Host Resources: mem-estimate=1.89GB mem-reservation=0B
>                            |
> | |  02:SCAN HDFS [tpcds_1000_parquet.store_sales c, RANDOM]
>                           |
> | |     partitions=1824/1824 files=1824 size=189.24GB
>                            |
> | |     stored statistics:
>                           |
> | |       table: rows=2879987999 size=unavailable
>                            |
> | |       partitions: 1824/1824 rows=2879987999
>                            |
> | |       columns: all
>                           |
> | |     extrapolated-rows=disabled
>                           |
> | |     mem-estimate=1.89GB mem-reservation=0B
>                           |
> | |     tuple-ids=2 row-size=100B cardinality=2879987999
>                           |
> | |
>                            |
> | 06:EXCHANGE [HASH(b.i_item_sk)]
>                            |
> | |  mem-estimate=0B mem-reservation=0B
>                            |
> | |  tuple-ids=0,1 row-size=811B cardinality=1500
>                            |
> | |
>                            |
> | F00:PLAN FRAGMENT [RANDOM] hosts=1 instances=1
>                           |
> | Per-Host Resources: mem-estimate=323.88MB mem-reservation=19.88MB
> runtime-filters-memory=17.00MB    |
> | 03:HASH JOIN [INNER JOIN, BROADCAST]
>                           |
> | |  hash predicates: a.p_item_sk = b.i_item_sk
>                            |
> | |  fk/pk conjuncts: a.p_item_sk = b.i_item_sk
>                            |
> | |  runtime filters: RF002[bloom] <- b.i_item_sk
>                            |
> | |  mem-estimate=2.88MB mem-reservation=2.88MB spill-buffer=128.00KB
>                            |
> | |  tuple-ids=0,1 row-size=811B cardinality=1500
>                            |
> | |
>                            |
> | |--05:EXCHANGE [BROADCAST]
>                           |
> | |  |  mem-estimate=0B mem-reservation=0B
>                           |
> | |  |  tuple-ids=1 row-size=496B cardinality=3226
>                           |
> | |  |
>                           |
> | |  F01:PLAN FRAGMENT [RANDOM] hosts=1 instances=1
>                            |
> | |  Per-Host Resources: mem-estimate=896.00MB mem-reservation=16.00MB
> runtime-filters-memory=16.00MB |
> | |  01:SCAN HDFS [tpcds_1000_parquet.item b, RANDOM]
>                            |
> | |     partitions=1/1 files=1 size=28.28MB
>                            |
> | |     predicates: b.i_color = 'xxx'
>                            |
> | |     runtime filters: RF000[bloom] -> b.i_item_sk
>                           |
> | |     stored statistics:
>                           |
> | |       table: rows=300000 size=28.28MB
>                            |
> | |       columns: all
>                           |
> | |     extrapolated-rows=disabled
>                           |
> | |     parquet statistics predicates: b.i_color = 'xxx'
>                           |
> | |     parquet dictionary predicates: b.i_color = 'xxx'
>                           |
> | |     mem-estimate=880.00MB mem-reservation=0B
>                           |
> | |     tuple-ids=1 row-size=496B cardinality=3226
>                           |
> | |
>                            |
> | 00:SCAN HDFS [tpcds_1000_parquet.promotion a, RANDOM]
>                            |
> |    partitions=1/1 files=1 size=85.50KB
>                           |
> |    runtime filters: RF000[bloom] -> a.p_item_sk, RF002[bloom] ->
> a.p_item_sk                        |
> |    stored statistics:
>                            |
> |      table: rows=1500 size=85.50KB
>                           |
> |      columns: all
>                            |
> |    extrapolated-rows=disabled
>                            |
> |    mem-estimate=304.00MB mem-reservation=0B
>                            |
> |    tuple-ids=0 row-size=315B cardinality=1500
>                            |
>
> +-----------------------------------------------------------------------------------------------------+
>
>
> Here because of the equi-joins, we have a slot equivalence for all of the
> 'item_id' columns. So, it seems it would be valid to take the runtime
> filter RF002 (generated from b.sk_item_id at HASH JOIN node 03) and apply
> it at the scan node 02 for store_sales. Notably, doing so is extremely
> beneficial in this manufactured query, since the scan of 'item' takes only
> a second or so to determine that in fact there are no items with color
> 'xxx'. This could completely short circuit the scan 02 of the very large
> store_sales table.
>
> Obviously this query is canned, but I'm seeing some cases in TPCDS (eg Q64)
> where a much more complex variant of this plan ends up being generated and
> failing to utilize an available runtime filter.
>
> I spent some time digging in the code, and I think the faulty logic might
> be here in RuntimeFilterGenerator.java:
>
>       generateFilters(ctx, root.getChild(0));
>       // Finalize every runtime filter of that join. This is to ensure that
> we don't
>       // assign a filter to a scan node from the right subtree of joinNode
> or ancestor
>       // join nodes in case we don't find a destination node in the left
> subtree.
>       for (RuntimeFilter runtimeFilter: filters)
> finalizeRuntimeFilter(runtimeFilter);
>
> I'm not quite sure how to understand the comment, but it seems this is
> preventing the runtime filter at the hash join from being sent to any node
> which is either on its right descendent or higher up the tree. In other
> words, it's limiting the runtime filter to _only_ be applied at nodes in
> its left subtree.
>
> Per my above example query, I think this is unnecessarily restrictive.
> Based on my understanding of Impala scheduling/parallelism, all of the
> build sides of joins start running in parallel from bottom up, and it's
> always possible for a build to complete somewhere deep in the tree before a
> build completes higher up the tree, in which case sending the RF to the
> hash join's "cousin" is beneficial. I think the only necessary restriction
> is that a RF should not be sent from a hash join node to any descendent of
> its right child.
>
> Keep in mind I'm very new to the Impala planner code and particularly to
> the runtime filter portion thereof, so I may have missed something. But
> does the above sound like a plausible bug/missed optimization?
>
> -Todd
> --
> Todd Lipcon
> Software Engineer, Cloudera
>

Reply via email to