Dustin Smith created SPARK-54554:
------------------------------------
Summary: Dynamic partition pruning not applied when partition key
is derived from SHOW PARTITIONS metadata and joined as broadcast (Spark 3.x and
4.x)
Key: SPARK-54554
URL: https://issues.apache.org/jira/browse/SPARK-54554
Project: Spark
Issue Type: Bug
Components: Optimizer, SQL
Affects Versions: 3.5.7, 4.0.1, 4.0.0, 3.5.6, 3.5.5, 3.5.4, 3.5.3, 3.4.4,
4.1.0, 3.5.8
Reporter: Dustin Smith
Dynamic Partition Pruning (DPP) in Spark is supposed to insert a
dynamicpruningexpression on the partitioned table scan when:
* One side of a join is a partitioned table, and
* The other side supplies a filter on the partition column, and
* The join is a broadcast hash join (or otherwise eligible for DPP).
This works in my patched build, but does not currently work in stock Spark 3.x
and 4.x
for a common pattern where the pruning values come from SHOW PARTITIONS
metadata.
MWE:
{code:java}
import org.apache.spark.sql.functions._
spark.conf.set("spark.sql.optimizer.dynamicPartitionPruning.enabled", "true")
spark.conf.set("spark.sql.adaptive.enabled", "true")
val data =
(1 to 100)
.map(i => (i, s"product_$i", i % 25, i * 10))
.toDF("date_id", "product_name", "store_id", "units_sold")data.write
.mode("overwrite")
.partitionBy("store_id")
.saveAsTable("test_table")
val partDF = spark.sql("SHOW PARTITIONS test_table")
val maxPartDF =
partDF
.agg(max("partition").alias("max_partition"))
.selectExpr("split(max_partition, '=')[1] as max_store_id")
val fact = spark.table("test_table").alias("f")
val maxPartAliased = maxPartDF.alias("m")
val df =
fact
.join(
broadcast(maxPartAliased),
col("f.store_id") === col("m.max_store_id").cast("int"),
"inner"
)
.select("f.date_id", "f.product_name", "f.units_sold", "f.store_id")
df.explain("formatted") {code}
The scan of `test_table` does not show any dynamic partition pruning:
(1) Scan parquet spark_catalog.default.test_table
Output [4]: [date_id#34, product_name#35, units_sold#36, store_id#37]
Batched: true
Location: InMemoryFileIndex [ ... store_id=0, ... 24 entries]
PartitionFilters: [isnotnull(store_id#37)]
ReadSchema: struct<date_id:int,product_name:string,units_sold:int>
There is no dynamicpruningexpression(...) in PartitionFilters, and there is no
dynamic pruning subquery attached to the scan.
As a result, all 24 partitions of test_table are eligible to be scanned, and the
broadcast join only filters after the full scan.
I would expect the dynamic partition pruning rule to recognize this and insert a
dynamicpruningexpression on the partitioned scan of test_table, similar to:
(1) Scan parquet spark_catalog.default.test_table
PartitionFilters: [isnotnull(store_id#37),
dynamicpruningexpression(store_id#37 IN dynamicpruning#...)]
As well as seeing the subquery:
===== Subqueries =====
Subquery:1 Hosting operator id = 1 Hosting Expression = store_id#37 IN
dynamicpruning#42
I have a local patch that:
* Extends the dynamic partition pruning rule to handle the case where the
pruning side originates from ShowPartitionsCommand (CommandResult) + aggregate
and is broadcasted.
* Ensures the partition filter is inserted as a DynamicPruning expression on
the scan of test_table so that partition directories are pruned before reading.
I plan to open a GitHub PR and will link it here once available.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]