[ 
https://issues.apache.org/jira/browse/SPARK-54554?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dustin Smith updated SPARK-54554:
---------------------------------
       Language: Scala
    Description: 
Dynamic Partition Pruning (DPP) in Spark is supposed to insert a 
dynamicpruningexpression  on the partitioned table scan when:
 * One side of a join is a partitioned table, and
 * The other side supplies a filter on the partition column, and
 * The join is a broadcast hash join (or otherwise eligible for DPP).

This works in my patched build, but does not currently work in stock Spark 3.x 
and 4.x
for a common pattern where the pruning values come from SHOW PARTITIONS 
metadata.

MWE:

 
{code:java}
import org.apache.spark.sql.functions._

spark.conf.set("spark.sql.optimizer.dynamicPartitionPruning.enabled", "true")
spark.conf.set("spark.sql.adaptive.enabled", "true")

val data =
  (1 to 100)
    .map(i => (i, s"product_$i", i % 25, i * 10))
    .toDF("date_id", "product_name", "store_id", "units_sold")data.write
  .mode("overwrite")
  .partitionBy("store_id")
  .saveAsTable("test_table")

val partDF = spark.sql("SHOW PARTITIONS test_table")

val maxPartDF =
  partDF
    .agg(max("partition").alias("max_partition"))
    .selectExpr("split(max_partition, '=')[1] as max_store_id")  

val fact = spark.table("test_table").alias("f")
val maxPartAliased = maxPartDF.alias("m")

val df =
  fact
    .join(
      broadcast(maxPartAliased),
      col("f.store_id") === col("m.max_store_id").cast("int"),
      "inner"
    )
    .select("f.date_id", "f.product_name", "f.units_sold", "f.store_id")

df.explain("formatted") {code}
 

The scan of `test_table` does not show any dynamic partition pruning:

(1) Scan parquet spark_catalog.default.test_table
Output [4]: [date_id#34, product_name#35, units_sold#36, store_id#37|#34, 
product_name#35, units_sold#36, store_id#37]
Batched: true
Location: InMemoryFileIndex [ ... store_id=0, ... 24 entries]
PartitionFilters: [isnotnull(store_id#37)|#37)]
ReadSchema: struct<date_id:int,product_name:string,units_sold:int>

There is no dynamicpruningexpression(...) in PartitionFilters, and there is no
dynamic pruning subquery attached to the scan.

As a result, all 24 partitions of test_table are eligible to be scanned, and the
broadcast join only filters after the full scan.

 

I would expect the dynamic partition pruning rule to recognize this and insert a
dynamicpruningexpression on the partitioned scan of test_table, similar to:

(1) Scan parquet spark_catalog.default.test_table
PartitionFilters: [isnotnull(store_id#37),
  dynamicpruningexpression(store_id#37 IN dynamicpruning#...)]

As well as seeing the subquery:

===== Subqueries =====

Subquery:1 Hosting operator id = 1 Hosting Expression = store_id#37 IN 
dynamicpruning#42

 

I have a local patch that:
 * Extends the dynamic partition pruning rule to handle the case where the 
pruning side originates from ShowPartitionsCommand (CommandResult) + aggregate 
and is broadcasted.
 * Ensures the partition filter is inserted as a DynamicPruning expression on 
the scan of test_table so that partition directories are pruned before reading.

I plan to open a GitHub PR and will link it here once available.

https://github.com/apache/spark/pull/53263

 

  was:
Dynamic Partition Pruning (DPP) in Spark is supposed to insert a 
dynamicpruningexpression  on the partitioned table scan when:
 * One side of a join is a partitioned table, and
 * The other side supplies a filter on the partition column, and
 * The join is a broadcast hash join (or otherwise eligible for DPP).

This works in my patched build, but does not currently work in stock Spark 3.x 
and 4.x
for a common pattern where the pruning values come from SHOW PARTITIONS 
metadata.

MWE:

 
{code:java}
import org.apache.spark.sql.functions._

spark.conf.set("spark.sql.optimizer.dynamicPartitionPruning.enabled", "true")
spark.conf.set("spark.sql.adaptive.enabled", "true")

val data =
  (1 to 100)
    .map(i => (i, s"product_$i", i % 25, i * 10))
    .toDF("date_id", "product_name", "store_id", "units_sold")data.write
  .mode("overwrite")
  .partitionBy("store_id")
  .saveAsTable("test_table")

val partDF = spark.sql("SHOW PARTITIONS test_table")

val maxPartDF =
  partDF
    .agg(max("partition").alias("max_partition"))
    .selectExpr("split(max_partition, '=')[1] as max_store_id")  

val fact = spark.table("test_table").alias("f")
val maxPartAliased = maxPartDF.alias("m")

val df =
  fact
    .join(
      broadcast(maxPartAliased),
      col("f.store_id") === col("m.max_store_id").cast("int"),
      "inner"
    )
    .select("f.date_id", "f.product_name", "f.units_sold", "f.store_id")

df.explain("formatted") {code}
 

The scan of `test_table` does not show any dynamic partition pruning:

(1) Scan parquet spark_catalog.default.test_table
Output [4]: [date_id#34, product_name#35, units_sold#36, store_id#37]
Batched: true
Location: InMemoryFileIndex [ ... store_id=0, ... 24 entries]
PartitionFilters: [isnotnull(store_id#37)]
ReadSchema: struct<date_id:int,product_name:string,units_sold:int>

There is no dynamicpruningexpression(...) in PartitionFilters, and there is no
dynamic pruning subquery attached to the scan.

As a result, all 24 partitions of test_table are eligible to be scanned, and the
broadcast join only filters after the full scan.

 

I would expect the dynamic partition pruning rule to recognize this and insert a
dynamicpruningexpression on the partitioned scan of test_table, similar to:

(1) Scan parquet spark_catalog.default.test_table
PartitionFilters: [isnotnull(store_id#37),
  dynamicpruningexpression(store_id#37 IN dynamicpruning#...)]

As well as seeing the subquery:

===== Subqueries =====

Subquery:1 Hosting operator id = 1 Hosting Expression = store_id#37 IN 
dynamicpruning#42

 

I have a local patch that:
 * Extends the dynamic partition pruning rule to handle the case where the 
pruning side originates from ShowPartitionsCommand (CommandResult) + aggregate 
and is broadcasted.
 * Ensures the partition filter is inserted as a DynamicPruning expression on 
the scan of test_table so that partition directories are pruned before reading.

I plan to open a GitHub PR and will link it here once available.

 


> Dynamic partition pruning not applied when partition key is derived from SHOW 
> PARTITIONS metadata and joined as broadcast (Spark 3.x and 4.x)
> ---------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-54554
>                 URL: https://issues.apache.org/jira/browse/SPARK-54554
>             Project: Spark
>          Issue Type: Bug
>          Components: Optimizer, SQL
>    Affects Versions: 3.4.4, 3.5.3, 3.5.4, 3.5.5, 4.1.0, 3.5.6, 4.0.0, 4.0.1, 
> 3.5.7, 3.5.8
>            Reporter: Dustin Smith
>            Priority: Major
>              Labels: pull-request-available
>
> Dynamic Partition Pruning (DPP) in Spark is supposed to insert a 
> dynamicpruningexpression  on the partitioned table scan when:
>  * One side of a join is a partitioned table, and
>  * The other side supplies a filter on the partition column, and
>  * The join is a broadcast hash join (or otherwise eligible for DPP).
> This works in my patched build, but does not currently work in stock Spark 
> 3.x and 4.x
> for a common pattern where the pruning values come from SHOW PARTITIONS 
> metadata.
> MWE:
>  
> {code:java}
> import org.apache.spark.sql.functions._
> spark.conf.set("spark.sql.optimizer.dynamicPartitionPruning.enabled", "true")
> spark.conf.set("spark.sql.adaptive.enabled", "true")
> val data =
>   (1 to 100)
>     .map(i => (i, s"product_$i", i % 25, i * 10))
>     .toDF("date_id", "product_name", "store_id", "units_sold")data.write
>   .mode("overwrite")
>   .partitionBy("store_id")
>   .saveAsTable("test_table")
> val partDF = spark.sql("SHOW PARTITIONS test_table")
> val maxPartDF =
>   partDF
>     .agg(max("partition").alias("max_partition"))
>     .selectExpr("split(max_partition, '=')[1] as max_store_id")  
> val fact = spark.table("test_table").alias("f")
> val maxPartAliased = maxPartDF.alias("m")
> val df =
>   fact
>     .join(
>       broadcast(maxPartAliased),
>       col("f.store_id") === col("m.max_store_id").cast("int"),
>       "inner"
>     )
>     .select("f.date_id", "f.product_name", "f.units_sold", "f.store_id")
> df.explain("formatted") {code}
>  
> The scan of `test_table` does not show any dynamic partition pruning:
> (1) Scan parquet spark_catalog.default.test_table
> Output [4]: [date_id#34, product_name#35, units_sold#36, store_id#37|#34, 
> product_name#35, units_sold#36, store_id#37]
> Batched: true
> Location: InMemoryFileIndex [ ... store_id=0, ... 24 entries]
> PartitionFilters: [isnotnull(store_id#37)|#37)]
> ReadSchema: struct<date_id:int,product_name:string,units_sold:int>
> There is no dynamicpruningexpression(...) in PartitionFilters, and there is no
> dynamic pruning subquery attached to the scan.
> As a result, all 24 partitions of test_table are eligible to be scanned, and 
> the
> broadcast join only filters after the full scan.
>  
> I would expect the dynamic partition pruning rule to recognize this and 
> insert a
> dynamicpruningexpression on the partitioned scan of test_table, similar to:
> (1) Scan parquet spark_catalog.default.test_table
> PartitionFilters: [isnotnull(store_id#37),
>   dynamicpruningexpression(store_id#37 IN dynamicpruning#...)]
> As well as seeing the subquery:
> ===== Subqueries =====
> Subquery:1 Hosting operator id = 1 Hosting Expression = store_id#37 IN 
> dynamicpruning#42
>  
> I have a local patch that:
>  * Extends the dynamic partition pruning rule to handle the case where the 
> pruning side originates from ShowPartitionsCommand (CommandResult) + 
> aggregate and is broadcasted.
>  * Ensures the partition filter is inserted as a DynamicPruning expression on 
> the scan of test_table so that partition directories are pruned before 
> reading.
> I plan to open a GitHub PR and will link it here once available.
> https://github.com/apache/spark/pull/53263
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to