[ 
https://issues.apache.org/jira/browse/SPARK-54554?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18041429#comment-18041429
 ] 

Dustin Smith commented on SPARK-54554:
--------------------------------------

PR: https://github.com/apache/spark/pull/53263

> Dynamic partition pruning not applied when partition key is derived from SHOW 
> PARTITIONS metadata and joined as broadcast (Spark 3.x and 4.x)
> ---------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-54554
>                 URL: https://issues.apache.org/jira/browse/SPARK-54554
>             Project: Spark
>          Issue Type: Bug
>          Components: Optimizer, SQL
>    Affects Versions: 3.4.4, 3.5.3, 3.5.4, 3.5.5, 4.1.0, 3.5.6, 4.0.0, 4.0.1, 
> 3.5.7, 3.5.8
>            Reporter: Dustin Smith
>            Priority: Major
>
> Dynamic Partition Pruning (DPP) in Spark is supposed to insert a 
> dynamicpruningexpression  on the partitioned table scan when:
>  * One side of a join is a partitioned table, and
>  * The other side supplies a filter on the partition column, and
>  * The join is a broadcast hash join (or otherwise eligible for DPP).
> This works in my patched build, but does not currently work in stock Spark 
> 3.x and 4.x
> for a common pattern where the pruning values come from SHOW PARTITIONS 
> metadata.
> MWE:
>  
> {code:java}
> import org.apache.spark.sql.functions._
> spark.conf.set("spark.sql.optimizer.dynamicPartitionPruning.enabled", "true")
> spark.conf.set("spark.sql.adaptive.enabled", "true")
> val data =
>   (1 to 100)
>     .map(i => (i, s"product_$i", i % 25, i * 10))
>     .toDF("date_id", "product_name", "store_id", "units_sold")data.write
>   .mode("overwrite")
>   .partitionBy("store_id")
>   .saveAsTable("test_table")
> val partDF = spark.sql("SHOW PARTITIONS test_table")
> val maxPartDF =
>   partDF
>     .agg(max("partition").alias("max_partition"))
>     .selectExpr("split(max_partition, '=')[1] as max_store_id")  
> val fact = spark.table("test_table").alias("f")
> val maxPartAliased = maxPartDF.alias("m")
> val df =
>   fact
>     .join(
>       broadcast(maxPartAliased),
>       col("f.store_id") === col("m.max_store_id").cast("int"),
>       "inner"
>     )
>     .select("f.date_id", "f.product_name", "f.units_sold", "f.store_id")
> df.explain("formatted") {code}
>  
> The scan of `test_table` does not show any dynamic partition pruning:
> (1) Scan parquet spark_catalog.default.test_table
> Output [4]: [date_id#34, product_name#35, units_sold#36, store_id#37]
> Batched: true
> Location: InMemoryFileIndex [ ... store_id=0, ... 24 entries]
> PartitionFilters: [isnotnull(store_id#37)]
> ReadSchema: struct<date_id:int,product_name:string,units_sold:int>
> There is no dynamicpruningexpression(...) in PartitionFilters, and there is no
> dynamic pruning subquery attached to the scan.
> As a result, all 24 partitions of test_table are eligible to be scanned, and 
> the
> broadcast join only filters after the full scan.
>  
> I would expect the dynamic partition pruning rule to recognize this and 
> insert a
> dynamicpruningexpression on the partitioned scan of test_table, similar to:
> (1) Scan parquet spark_catalog.default.test_table
> PartitionFilters: [isnotnull(store_id#37),
>   dynamicpruningexpression(store_id#37 IN dynamicpruning#...)]
> As well as seeing the subquery:
> ===== Subqueries =====
> Subquery:1 Hosting operator id = 1 Hosting Expression = store_id#37 IN 
> dynamicpruning#42
>  
> I have a local patch that:
>  * Extends the dynamic partition pruning rule to handle the case where the 
> pruning side originates from ShowPartitionsCommand (CommandResult) + 
> aggregate and is broadcasted.
>  * Ensures the partition filter is inserted as a DynamicPruning expression on 
> the scan of test_table so that partition directories are pruned before 
> reading.
> I plan to open a GitHub PR and will link it here once available.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to