Hi Jie, Thank you for the response. Dynamic pruning work to filter prune
the first join not the second one. so in the example I shared above.
big_table is partition pruned but bigger_table is not.
Here's the result of running explain extended on the following query:

Select * FROM jlee_ntm.tt_om_po AS small_table
  INNER JOIN jlee_ntm.TT_OM_POLINE AS big_table ON
    small_table.year=big_table.year
  INNER JOIN jlee_ntm.TT_OM_POLINE_SHIPMENT AS bigger_table ON
    big_table.month=bigger_table.month
  WHERE
    small_table.createtime < "2003"


In the fictional query above: big_table is partitioned by year and
bigger_table is partitioned by month.


== Parsed Logical Plan ==
'Project [*]
+- 'Filter ('small_table.createtime < 2003)
+- 'Join Inner, ('big_table.month = 'bigger_table.month)
:- 'Join Inner, ('small_table.year = 'big_table.year)
: :- 'SubqueryAlias small_table
: : +- 'UnresolvedRelation [default, small_table], [], false
: +- 'SubqueryAlias big_table
: +- 'UnresolvedRelation [default, big_table], [], false
+- 'SubqueryAlias bigger_table
+- 'UnresolvedRelation [default, bigger_table], [], false

== Analyzed Logical Plan ==

Project [ ... 281 more fields]
+- Filter (createtime#6730 < cast(2003 as timestamp))
+- Join Inner, (month#6924 = month#7098)
:- Join Inner, (year#6774 = year#6923)
: :- SubqueryAlias small_table
: : +- Relation default.small_table[... 44 more fields] parquet
: +- SubqueryAlias big_table
: +- Relation default.big_table[... 127 more fields] parquet
+- SubqueryAlias bigger_table
+- Relation default.bigger_table[... 62 more fields] parquet

== Optimized Logical Plan ==
Join Inner, (month#6924 = month#7098),
leftHint=(dynamicPruningFilterId=Some(7104))
:- Join Inner, (year#6774 = year#6923),
leftHint=(dynamicPruningFilterId=Some(7102))
: :- Filter ((isnotnull(createtime#6730) AND (createtime#6730 < 2003-01-01
00:00:00)) AND isnotnull(year#6774))
: : +- Filter ((year#6774 < year(cast(2003-01-01 00:00:00 as date))) OR
((year#6774 = year(cast(2003-01-01 00:00:00 as date))) AND (month#6775 <=
month(cast(2003-01-01 00:00:00 as date)))))
: : +- Relation default.small_table[... 44 more fields] parquet
: +- Filter ((isnotnull(year#6923) AND isnotnull(month#6924)) AND
dynamicpruning#7103 7102)
: +- Relation default.big_table[... 127 more fields] parquet
+- Filter (isnotnull(month#7098) AND dynamicpruning#7105 7104)
+- Relation default.bigger_table[... 62 more fields] parquet

== Physical Plan ==
BroadcastHashJoin [month#6924], [month#7098], Inner, BuildRight, false
:- BroadcastHashJoin [year#6774], [year#6923], Inner, BuildLeft, false
: :- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[66, int,
true] as bigint)),false), [plan_id=1487]
: : +- *(1) Filter (isnotnull(createtime#6730) AND (createtime#6730 <
2003-01-01 00:00:00))
: : +- *(1) ColumnarToRow
: : +- FileScan parquet default.small_table[... 44 more fields] Batched:
true, DataFilters: [isnotnull(createtime#6730), (createtime#6730 <
2003-01-01 00:00:00)], Format: Parquet, Location: PreparedDeltaFileIndex(1
paths)[..., PartitionFilters: [((year#6774 < year(cast(2003-01-01 00:00:00
as date))) OR ((year#6774 = year(cast(2003-01-01 00:..., PushedFilters:
[IsNotNull(createtime), LessThan(createtime,2003-01-01 00:00:00.0)],
ReadSchema: ...
: +- Project [ ... 127 more fields]
: +- FileScan parquet default.big_table[... 127 more fields] Batched:
false, DataFilters: [], Format: Parquet, Location: PreparedDeltaFileIndex(1
paths)[..., PartitionFilters: [isnotnull(year#6923), isnotnull(month#6924),
dynamicpruningexpression(year#6923 IN dynamicprunin..., PushedFilters: [],
ReadSchema: ...
: +- SubqueryBroadcast dynamicpruning#7103, 0, [year#6774], false, [id=#1511
]
: +- ReusedExchange [ ... 44 more fields], BroadcastExchange
HashedRelationBroadcastMode(List(cast(input[66, int, true] as
bigint)),false), [plan_id=1487]
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[85, int,
true] as bigint)),false), [plan_id=1502]
+- *(2) ColumnarToRow
+- FileScan parquet default.bigger_table[.... 62 more fields] Batched:
true, DataFilters: [], Format: Parquet, Location: PreparedDeltaFileIndex(1
paths)[......, PartitionFilters: [isnotnull(month#7098),
dynamicpruningexpression(true)], PushedFilters: [], ReadSchema: ...


The interesting part is that I'm seeing a reference to dynamicpruning#7104
in the optimized logical plan of the default.bigger_table relation.
However, in the physical plan, it translates
into dynamicpruningexpression(true) which I assume doesn't filter anything.
Not sure why that's the case. in the query plan above, bigger_table will be
fully loaded.

On Fri, Nov 11, 2022 at 5:53 AM Jie Han <tunyu...@gmail.com> wrote:

> Which version are you using? I test it in spark 3.2.1 and sure that
> dynamic pruning works in queries with multi joins.
> BTW, could you execute ‘explain extended your sql’?
>
> > 2022年11月10日 02:10,hajyoussef amine <hajyoussef.am...@gmail.com> 写道:
> >
> > Hello everyone,
> >
> > Let me take the following spark sql example to demonstrate the issue
> we're having:
> >
> > ```
> > Select * FROM small_table
> >     Inner join big_table on small_table.foreign_key =
> big_table.partition_key
> >     Inner join bigger_table on big_table.foreign_key =
> bigger_table.partition_key
> >   where small_date.date="2022-01-01"
> > ```
> >
> > The efficient execution plan in the case above is to load the small
> table filtered by date(resulting in a few rows), use it to partition prune
> the big_table so only relevant partitions are loaded, then join the two
> tables together, and use the join result to partition prune the bigger
> table.
> >
> > I can't find a way to easily implement the strategy above in spark.
> Dynamic partition pruning seems to support only one level of depth, so the
> big_table is partition pruned, but the bigger table is always fully loaded.
> >
> > Spark tries to parallelize things so it loads all the tables in
> parallel. Interestingly, however, that is not the ideal approach in this
> case. I'm not sure if spark has a mechanism to cancel pending tasks and
> adaptively change physical execution strategy as new information comes
> in(in this for example, spark ideally cancels loading the bigger_table,
> after the small_table big_table join result is available and a small amount
> of rows are returned. spark can use the resulting rows to partition prune
> the bigger table assuming partition keys are in the join condition)
> >
> > The only way I found to implement the strategy is to break the
> computation in two steps, persist the first join result into disk, and then
> load it and use it to partition and prune the bigger table. The code will
> be something like this:
> >
> >
> > ```
> > spark.sql("""
> > Select * FROM small_table
> >     Inner join big_table on small_table.foreign_key =
> big_table.partition_key
> >     where small_date.date="2022-01-01"
> >
>  """).write.format("parquet").mode("overwrite").save("path/to/test.parquet")
> >
> >
> spark.read.format("parquet").load("path/to/test.parquet").createOrReplaceTempView("step1")
> >
> > spark.sql("""
> > Select * FROM step_1
> >     Inner join bigger_table on step_1.foreign_key =
> bigger_table.partition_key
> >     where step_1.date="2022-01-01"
> > """).collect()
> > ```
> >
> > I could not get `persist` to trigger computation for some reason(even
> after running `count` on it), that's why I had to save it into a parquet,
> and then reload it.
> >
> >
> > The issue with the code above apart from having to save it in disk is
> that it requires manual rewriting queries which is not convenient
> especially for queries with multiple joins.
> >
> >
> > I'm looking for some insights on how to efficiently execute the query
> above without having to fetch full data of the joined tables.
>
>

Reply via email to