[ 
https://issues.apache.org/jira/browse/SPARK-44641?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chao Sun updated SPARK-44641:
-----------------------------
        Parent: SPARK-37375
    Issue Type: Sub-task  (was: Bug)

> Results duplicated when SPJ partial-cluster and pushdown enabled but 
> conditions unmet
> -------------------------------------------------------------------------------------
>
>                 Key: SPARK-44641
>                 URL: https://issues.apache.org/jira/browse/SPARK-44641
>             Project: Spark
>          Issue Type: Sub-task
>          Components: SQL
>    Affects Versions: 3.4.1
>            Reporter: Szehon Ho
>            Priority: Major
>
> Adding the following test case in KeyGroupedPartitionSuite demonstrates the 
> problem.
>  
> {code:java}
> test("test join key is the second partition key and a transform") {
>   val items_partitions = Array(bucket(8, "id"), days("arrive_time"))
>   createTable(items, items_schema, items_partitions)
>   sql(s"INSERT INTO testcat.ns.$items VALUES " +
>     s"(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " +
>     s"(1, 'aa', 41.0, cast('2020-01-15' as timestamp)), " +
>     s"(2, 'bb', 10.0, cast('2020-01-01' as timestamp)), " +
>     s"(2, 'bb', 10.5, cast('2020-01-01' as timestamp)), " +
>     s"(3, 'cc', 15.5, cast('2020-02-01' as timestamp))")
>   val purchases_partitions = Array(bucket(8, "item_id"), days("time"))
>   createTable(purchases, purchases_schema, purchases_partitions)
>   sql(s"INSERT INTO testcat.ns.$purchases VALUES " +
>     s"(1, 42.0, cast('2020-01-01' as timestamp)), " +
>     s"(1, 44.0, cast('2020-01-15' as timestamp)), " +
>     s"(1, 45.0, cast('2020-01-15' as timestamp)), " +
>     s"(2, 11.0, cast('2020-01-01' as timestamp)), " +
>     s"(3, 19.5, cast('2020-02-01' as timestamp))")
>   withSQLConf(
>     SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION.key -> "false",
>     SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> "true",
>     SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key ->
>       "true") {
>     val df = sql("SELECT id, name, i.price as purchase_price, " +
>       "p.item_id, p.price as sale_price " +
>       s"FROM testcat.ns.$items i JOIN testcat.ns.$purchases p " +
>       "ON i.arrive_time = p.time " +
>       "ORDER BY id, purchase_price, p.item_id, sale_price")
>     val shuffles = collectShuffles(df.queryExecution.executedPlan)
>     assert(!shuffles.isEmpty, "should not perform SPJ as not all join keys 
> are partition keys")
>     checkAnswer(df,
>       Seq(
>         Row(1, "aa", 40.0, 1, 42.0),
>         Row(1, "aa", 40.0, 2, 11.0),
>         Row(1, "aa", 41.0, 1, 44.0),
>         Row(1, "aa", 41.0, 1, 45.0),
>         Row(2, "bb", 10.0, 1, 42.0),
>         Row(2, "bb", 10.0, 2, 11.0),
>         Row(2, "bb", 10.5, 1, 42.0),
>         Row(2, "bb", 10.5, 2, 11.0),
>         Row(3, "cc", 15.5, 3, 19.5)
>       )
>     )
>   }
> }{code}
>  
> Note: this tests has setup the datasourceV2 to return multiple splits for 
> same partition.
> In this case, SPJ is not triggered (because join key does not match partition 
> key), but the following code in DSV2Scan:
> [https://github.com/apache/spark/blob/v3.4.1/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala#L194]
> intended to fill the empty partition for 'pushdown-vallue' will still iterate 
> through non-grouped partition and lookup from grouped partition to fill the 
> map, resulting in some duplicate input data fed into the join.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to