Dustin Smith created SPARK-54554:
------------------------------------

             Summary: Dynamic partition pruning not applied when partition key 
is derived from SHOW PARTITIONS metadata and joined as broadcast (Spark 3.x and 
4.x)
                 Key: SPARK-54554
                 URL: https://issues.apache.org/jira/browse/SPARK-54554
             Project: Spark
          Issue Type: Bug
          Components: Optimizer, SQL
    Affects Versions: 3.5.7, 4.0.1, 4.0.0, 3.5.6, 3.5.5, 3.5.4, 3.5.3, 3.4.4, 
4.1.0, 3.5.8
            Reporter: Dustin Smith


Dynamic Partition Pruning (DPP) in Spark is supposed to insert a 
dynamicpruningexpression  on the partitioned table scan when:
 * One side of a join is a partitioned table, and
 * The other side supplies a filter on the partition column, and
 * The join is a broadcast hash join (or otherwise eligible for DPP).

This works in my patched build, but does not currently work in stock Spark 3.x 
and 4.x
for a common pattern where the pruning values come from SHOW PARTITIONS 
metadata.

MWE:

 
{code:java}
import org.apache.spark.sql.functions._

spark.conf.set("spark.sql.optimizer.dynamicPartitionPruning.enabled", "true")
spark.conf.set("spark.sql.adaptive.enabled", "true")

val data =
  (1 to 100)
    .map(i => (i, s"product_$i", i % 25, i * 10))
    .toDF("date_id", "product_name", "store_id", "units_sold")data.write
  .mode("overwrite")
  .partitionBy("store_id")
  .saveAsTable("test_table")

val partDF = spark.sql("SHOW PARTITIONS test_table")

val maxPartDF =
  partDF
    .agg(max("partition").alias("max_partition"))
    .selectExpr("split(max_partition, '=')[1] as max_store_id")  

val fact = spark.table("test_table").alias("f")
val maxPartAliased = maxPartDF.alias("m")

val df =
  fact
    .join(
      broadcast(maxPartAliased),
      col("f.store_id") === col("m.max_store_id").cast("int"),
      "inner"
    )
    .select("f.date_id", "f.product_name", "f.units_sold", "f.store_id")

df.explain("formatted") {code}
 

The scan of `test_table` does not show any dynamic partition pruning:

(1) Scan parquet spark_catalog.default.test_table
Output [4]: [date_id#34, product_name#35, units_sold#36, store_id#37]
Batched: true
Location: InMemoryFileIndex [ ... store_id=0, ... 24 entries]
PartitionFilters: [isnotnull(store_id#37)]
ReadSchema: struct<date_id:int,product_name:string,units_sold:int>

There is no dynamicpruningexpression(...) in PartitionFilters, and there is no
dynamic pruning subquery attached to the scan.

As a result, all 24 partitions of test_table are eligible to be scanned, and the
broadcast join only filters after the full scan.

 

I would expect the dynamic partition pruning rule to recognize this and insert a
dynamicpruningexpression on the partitioned scan of test_table, similar to:

(1) Scan parquet spark_catalog.default.test_table
PartitionFilters: [isnotnull(store_id#37),
  dynamicpruningexpression(store_id#37 IN dynamicpruning#...)]

As well as seeing the subquery:

===== Subqueries =====

Subquery:1 Hosting operator id = 1 Hosting Expression = store_id#37 IN 
dynamicpruning#42

 

I have a local patch that:
 * Extends the dynamic partition pruning rule to handle the case where the 
pruning side originates from ShowPartitionsCommand (CommandResult) + aggregate 
and is broadcasted.
 * Ensures the partition filter is inserted as a DynamicPruning expression on 
the scan of test_table so that partition directories are pruned before reading.

I plan to open a GitHub PR and will link it here once available.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to