Hi Jie, Thank you for the response. Dynamic pruning work to filter prune the first join not the second one. so in the example I shared above. big_table is partition pruned but bigger_table is not. Here's the result of running explain extended on the following query:
Select * FROM jlee_ntm.tt_om_po AS small_table INNER JOIN jlee_ntm.TT_OM_POLINE AS big_table ON small_table.year=big_table.year INNER JOIN jlee_ntm.TT_OM_POLINE_SHIPMENT AS bigger_table ON big_table.month=bigger_table.month WHERE small_table.createtime < "2003" In the fictional query above: big_table is partitioned by year and bigger_table is partitioned by month. == Parsed Logical Plan == 'Project [*] +- 'Filter ('small_table.createtime < 2003) +- 'Join Inner, ('big_table.month = 'bigger_table.month) :- 'Join Inner, ('small_table.year = 'big_table.year) : :- 'SubqueryAlias small_table : : +- 'UnresolvedRelation [default, small_table], [], false : +- 'SubqueryAlias big_table : +- 'UnresolvedRelation [default, big_table], [], false +- 'SubqueryAlias bigger_table +- 'UnresolvedRelation [default, bigger_table], [], false == Analyzed Logical Plan == Project [ ... 281 more fields] +- Filter (createtime#6730 < cast(2003 as timestamp)) +- Join Inner, (month#6924 = month#7098) :- Join Inner, (year#6774 = year#6923) : :- SubqueryAlias small_table : : +- Relation default.small_table[... 44 more fields] parquet : +- SubqueryAlias big_table : +- Relation default.big_table[... 127 more fields] parquet +- SubqueryAlias bigger_table +- Relation default.bigger_table[... 62 more fields] parquet == Optimized Logical Plan == Join Inner, (month#6924 = month#7098), leftHint=(dynamicPruningFilterId=Some(7104)) :- Join Inner, (year#6774 = year#6923), leftHint=(dynamicPruningFilterId=Some(7102)) : :- Filter ((isnotnull(createtime#6730) AND (createtime#6730 < 2003-01-01 00:00:00)) AND isnotnull(year#6774)) : : +- Filter ((year#6774 < year(cast(2003-01-01 00:00:00 as date))) OR ((year#6774 = year(cast(2003-01-01 00:00:00 as date))) AND (month#6775 <= month(cast(2003-01-01 00:00:00 as date))))) : : +- Relation default.small_table[... 44 more fields] parquet : +- Filter ((isnotnull(year#6923) AND isnotnull(month#6924)) AND dynamicpruning#7103 7102) : +- Relation default.big_table[... 127 more fields] parquet +- Filter (isnotnull(month#7098) AND dynamicpruning#7105 7104) +- Relation default.bigger_table[... 62 more fields] parquet == Physical Plan == BroadcastHashJoin [month#6924], [month#7098], Inner, BuildRight, false :- BroadcastHashJoin [year#6774], [year#6923], Inner, BuildLeft, false : :- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[66, int, true] as bigint)),false), [plan_id=1487] : : +- *(1) Filter (isnotnull(createtime#6730) AND (createtime#6730 < 2003-01-01 00:00:00)) : : +- *(1) ColumnarToRow : : +- FileScan parquet default.small_table[... 44 more fields] Batched: true, DataFilters: [isnotnull(createtime#6730), (createtime#6730 < 2003-01-01 00:00:00)], Format: Parquet, Location: PreparedDeltaFileIndex(1 paths)[..., PartitionFilters: [((year#6774 < year(cast(2003-01-01 00:00:00 as date))) OR ((year#6774 = year(cast(2003-01-01 00:..., PushedFilters: [IsNotNull(createtime), LessThan(createtime,2003-01-01 00:00:00.0)], ReadSchema: ... : +- Project [ ... 127 more fields] : +- FileScan parquet default.big_table[... 127 more fields] Batched: false, DataFilters: [], Format: Parquet, Location: PreparedDeltaFileIndex(1 paths)[..., PartitionFilters: [isnotnull(year#6923), isnotnull(month#6924), dynamicpruningexpression(year#6923 IN dynamicprunin..., PushedFilters: [], ReadSchema: ... : +- SubqueryBroadcast dynamicpruning#7103, 0, [year#6774], false, [id=#1511 ] : +- ReusedExchange [ ... 44 more fields], BroadcastExchange HashedRelationBroadcastMode(List(cast(input[66, int, true] as bigint)),false), [plan_id=1487] +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[85, int, true] as bigint)),false), [plan_id=1502] +- *(2) ColumnarToRow +- FileScan parquet default.bigger_table[.... 62 more fields] Batched: true, DataFilters: [], Format: Parquet, Location: PreparedDeltaFileIndex(1 paths)[......, PartitionFilters: [isnotnull(month#7098), dynamicpruningexpression(true)], PushedFilters: [], ReadSchema: ... The interesting part is that I'm seeing a reference to dynamicpruning#7104 in the optimized logical plan of the default.bigger_table relation. However, in the physical plan, it translates into dynamicpruningexpression(true) which I assume doesn't filter anything. Not sure why that's the case. in the query plan above, bigger_table will be fully loaded. On Fri, Nov 11, 2022 at 5:53 AM Jie Han <tunyu...@gmail.com> wrote: > Which version are you using? I test it in spark 3.2.1 and sure that > dynamic pruning works in queries with multi joins. > BTW, could you execute ‘explain extended your sql’? > > > 2022年11月10日 02:10,hajyoussef amine <hajyoussef.am...@gmail.com> 写道: > > > > Hello everyone, > > > > Let me take the following spark sql example to demonstrate the issue > we're having: > > > > ``` > > Select * FROM small_table > > Inner join big_table on small_table.foreign_key = > big_table.partition_key > > Inner join bigger_table on big_table.foreign_key = > bigger_table.partition_key > > where small_date.date="2022-01-01" > > ``` > > > > The efficient execution plan in the case above is to load the small > table filtered by date(resulting in a few rows), use it to partition prune > the big_table so only relevant partitions are loaded, then join the two > tables together, and use the join result to partition prune the bigger > table. > > > > I can't find a way to easily implement the strategy above in spark. > Dynamic partition pruning seems to support only one level of depth, so the > big_table is partition pruned, but the bigger table is always fully loaded. > > > > Spark tries to parallelize things so it loads all the tables in > parallel. Interestingly, however, that is not the ideal approach in this > case. I'm not sure if spark has a mechanism to cancel pending tasks and > adaptively change physical execution strategy as new information comes > in(in this for example, spark ideally cancels loading the bigger_table, > after the small_table big_table join result is available and a small amount > of rows are returned. spark can use the resulting rows to partition prune > the bigger table assuming partition keys are in the join condition) > > > > The only way I found to implement the strategy is to break the > computation in two steps, persist the first join result into disk, and then > load it and use it to partition and prune the bigger table. The code will > be something like this: > > > > > > ``` > > spark.sql(""" > > Select * FROM small_table > > Inner join big_table on small_table.foreign_key = > big_table.partition_key > > where small_date.date="2022-01-01" > > > """).write.format("parquet").mode("overwrite").save("path/to/test.parquet") > > > > > spark.read.format("parquet").load("path/to/test.parquet").createOrReplaceTempView("step1") > > > > spark.sql(""" > > Select * FROM step_1 > > Inner join bigger_table on step_1.foreign_key = > bigger_table.partition_key > > where step_1.date="2022-01-01" > > """).collect() > > ``` > > > > I could not get `persist` to trigger computation for some reason(even > after running `count` on it), that's why I had to save it into a parquet, > and then reload it. > > > > > > The issue with the code above apart from having to save it in disk is > that it requires manual rewriting queries which is not convenient > especially for queries with multiple joins. > > > > > > I'm looking for some insights on how to efficiently execute the query > above without having to fetch full data of the joined tables. > >