[
https://issues.apache.org/jira/browse/SPARK-54554?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Dustin Smith updated SPARK-54554:
---------------------------------
Labels: pull-request-available (was: )
> Dynamic partition pruning not applied when partition key is derived from SHOW
> PARTITIONS metadata and joined as broadcast (Spark 3.x and 4.x)
> ---------------------------------------------------------------------------------------------------------------------------------------------
>
> Key: SPARK-54554
> URL: https://issues.apache.org/jira/browse/SPARK-54554
> Project: Spark
> Issue Type: Bug
> Components: Optimizer, SQL
> Affects Versions: 3.4.4, 3.5.3, 3.5.4, 3.5.5, 4.1.0, 3.5.6, 4.0.0, 4.0.1,
> 3.5.7, 3.5.8
> Reporter: Dustin Smith
> Priority: Major
> Labels: pull-request-available
>
> Dynamic Partition Pruning (DPP) in Spark is supposed to insert a
> dynamicpruningexpression on the partitioned table scan when:
> * One side of a join is a partitioned table, and
> * The other side supplies a filter on the partition column, and
> * The join is a broadcast hash join (or otherwise eligible for DPP).
> This works in my patched build, but does not currently work in stock Spark
> 3.x and 4.x
> for a common pattern where the pruning values come from SHOW PARTITIONS
> metadata.
> MWE:
>
> {code:java}
> import org.apache.spark.sql.functions._
> spark.conf.set("spark.sql.optimizer.dynamicPartitionPruning.enabled", "true")
> spark.conf.set("spark.sql.adaptive.enabled", "true")
> val data =
> (1 to 100)
> .map(i => (i, s"product_$i", i % 25, i * 10))
> .toDF("date_id", "product_name", "store_id", "units_sold")data.write
> .mode("overwrite")
> .partitionBy("store_id")
> .saveAsTable("test_table")
> val partDF = spark.sql("SHOW PARTITIONS test_table")
> val maxPartDF =
> partDF
> .agg(max("partition").alias("max_partition"))
> .selectExpr("split(max_partition, '=')[1] as max_store_id")
> val fact = spark.table("test_table").alias("f")
> val maxPartAliased = maxPartDF.alias("m")
> val df =
> fact
> .join(
> broadcast(maxPartAliased),
> col("f.store_id") === col("m.max_store_id").cast("int"),
> "inner"
> )
> .select("f.date_id", "f.product_name", "f.units_sold", "f.store_id")
> df.explain("formatted") {code}
>
> The scan of `test_table` does not show any dynamic partition pruning:
> (1) Scan parquet spark_catalog.default.test_table
> Output [4]: [date_id#34, product_name#35, units_sold#36, store_id#37]
> Batched: true
> Location: InMemoryFileIndex [ ... store_id=0, ... 24 entries]
> PartitionFilters: [isnotnull(store_id#37)]
> ReadSchema: struct<date_id:int,product_name:string,units_sold:int>
> There is no dynamicpruningexpression(...) in PartitionFilters, and there is no
> dynamic pruning subquery attached to the scan.
> As a result, all 24 partitions of test_table are eligible to be scanned, and
> the
> broadcast join only filters after the full scan.
>
> I would expect the dynamic partition pruning rule to recognize this and
> insert a
> dynamicpruningexpression on the partitioned scan of test_table, similar to:
> (1) Scan parquet spark_catalog.default.test_table
> PartitionFilters: [isnotnull(store_id#37),
> dynamicpruningexpression(store_id#37 IN dynamicpruning#...)]
> As well as seeing the subquery:
> ===== Subqueries =====
> Subquery:1 Hosting operator id = 1 Hosting Expression = store_id#37 IN
> dynamicpruning#42
>
> I have a local patch that:
> * Extends the dynamic partition pruning rule to handle the case where the
> pruning side originates from ShowPartitionsCommand (CommandResult) +
> aggregate and is broadcasted.
> * Ensures the partition filter is inserted as a DynamicPruning expression on
> the scan of test_table so that partition directories are pruned before
> reading.
> I plan to open a GitHub PR and will link it here once available.
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]