Szehon Ho created SPARK-44641:
---------------------------------

             Summary: Results duplicated when SPJ partial-cluster and pushdown 
enabled but conditions unmet
                 Key: SPARK-44641
                 URL: https://issues.apache.org/jira/browse/SPARK-44641
             Project: Spark
          Issue Type: Bug
          Components: SQL
    Affects Versions: 3.4.1
            Reporter: Szehon Ho


Adding the following test case in KeyGroupedPartitionSuite demonstrates the 
problem.

 
{code:java}
test("test join key is the second partition key and a transform") {
  val items_partitions = Array(bucket(8, "id"), days("arrive_time"))
  createTable(items, items_schema, items_partitions)
  sql(s"INSERT INTO testcat.ns.$items VALUES " +
    s"(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " +
    s"(1, 'aa', 41.0, cast('2020-01-15' as timestamp)), " +
    s"(2, 'bb', 10.0, cast('2020-01-01' as timestamp)), " +
    s"(2, 'bb', 10.5, cast('2020-01-01' as timestamp)), " +
    s"(3, 'cc', 15.5, cast('2020-02-01' as timestamp))")

  val purchases_partitions = Array(bucket(8, "item_id"), days("time"))
  createTable(purchases, purchases_schema, purchases_partitions)
  sql(s"INSERT INTO testcat.ns.$purchases VALUES " +
    s"(1, 42.0, cast('2020-01-01' as timestamp)), " +
    s"(1, 44.0, cast('2020-01-15' as timestamp)), " +
    s"(1, 45.0, cast('2020-01-15' as timestamp)), " +
    s"(2, 11.0, cast('2020-01-01' as timestamp)), " +
    s"(3, 19.5, cast('2020-02-01' as timestamp))")

  withSQLConf(
    SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION.key -> "false",
    SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> "true",
    SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key ->
      "true") {
    val df = sql("SELECT id, name, i.price as purchase_price, " +
      "p.item_id, p.price as sale_price " +
      s"FROM testcat.ns.$items i JOIN testcat.ns.$purchases p " +
      "ON i.arrive_time = p.time " +
      "ORDER BY id, purchase_price, p.item_id, sale_price")

    val shuffles = collectShuffles(df.queryExecution.executedPlan)
    assert(!shuffles.isEmpty, "should not perform SPJ as not all join keys are 
partition keys")
    checkAnswer(df,
      Seq(
        Row(1, "aa", 40.0, 1, 42.0),
        Row(1, "aa", 40.0, 2, 11.0),
        Row(1, "aa", 41.0, 1, 44.0),
        Row(1, "aa", 41.0, 1, 45.0),
        Row(2, "bb", 10.0, 1, 42.0),
        Row(2, "bb", 10.0, 2, 11.0),
        Row(2, "bb", 10.5, 1, 42.0),
        Row(2, "bb", 10.5, 2, 11.0),
        Row(3, "cc", 15.5, 3, 19.5)
      )
    )
  }
}{code}
 

This tests the case the datasourceV2 returns multiple splits for same partition.

In this case, SPJ is not triggered, but the following code in DSV2Scan:

https://github.com/apache/spark/blob/v3.4.1/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala#L194

intended to fill the empty partition for 'pushdown-vallue' will still iterate 
through non-grouped partition and lookup from grouped partition to fill the 
map, resulting in some duplicate input data fed into the join.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to