[ https://issues.apache.org/jira/browse/SPARK-44641?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Dongjoon Hyun updated SPARK-44641: ---------------------------------- Affects Version/s: 3.4.0 > SPJ: Results duplicated when SPJ partial-cluster and pushdown enabled but > conditions unmet > ------------------------------------------------------------------------------------------ > > Key: SPARK-44641 > URL: https://issues.apache.org/jira/browse/SPARK-44641 > Project: Spark > Issue Type: Sub-task > Components: SQL > Affects Versions: 3.4.0, 3.4.1 > Reporter: Szehon Ho > Priority: Blocker > > Adding the following test case in KeyGroupedPartitionSuite demonstrates the > problem. > > {code:java} > test("test join key is the second partition key and a transform") { > val items_partitions = Array(bucket(8, "id"), days("arrive_time")) > createTable(items, items_schema, items_partitions) > sql(s"INSERT INTO testcat.ns.$items VALUES " + > s"(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " + > s"(1, 'aa', 41.0, cast('2020-01-15' as timestamp)), " + > s"(2, 'bb', 10.0, cast('2020-01-01' as timestamp)), " + > s"(2, 'bb', 10.5, cast('2020-01-01' as timestamp)), " + > s"(3, 'cc', 15.5, cast('2020-02-01' as timestamp))") > val purchases_partitions = Array(bucket(8, "item_id"), days("time")) > createTable(purchases, purchases_schema, purchases_partitions) > sql(s"INSERT INTO testcat.ns.$purchases VALUES " + > s"(1, 42.0, cast('2020-01-01' as timestamp)), " + > s"(1, 44.0, cast('2020-01-15' as timestamp)), " + > s"(1, 45.0, cast('2020-01-15' as timestamp)), " + > s"(2, 11.0, cast('2020-01-01' as timestamp)), " + > s"(3, 19.5, cast('2020-02-01' as timestamp))") > withSQLConf( > SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION.key -> "false", > SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> "true", > SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key -> > "true") { > val df = sql("SELECT id, name, i.price as purchase_price, " + > "p.item_id, p.price as sale_price " + > s"FROM testcat.ns.$items i JOIN testcat.ns.$purchases p " + > "ON i.arrive_time = p.time " + > "ORDER BY id, purchase_price, p.item_id, sale_price") > val shuffles = collectShuffles(df.queryExecution.executedPlan) > assert(!shuffles.isEmpty, "should not perform SPJ as not all join keys > are partition keys") > checkAnswer(df, > Seq( > Row(1, "aa", 40.0, 1, 42.0), > Row(1, "aa", 40.0, 2, 11.0), > Row(1, "aa", 41.0, 1, 44.0), > Row(1, "aa", 41.0, 1, 45.0), > Row(2, "bb", 10.0, 1, 42.0), > Row(2, "bb", 10.0, 2, 11.0), > Row(2, "bb", 10.5, 1, 42.0), > Row(2, "bb", 10.5, 2, 11.0), > Row(3, "cc", 15.5, 3, 19.5) > ) > ) > } > }{code} > > Note: this tests has setup the datasourceV2 to return multiple splits for > same partition. > In this case, SPJ is not triggered (because join key does not match partition > key), but the following code in DSV2Scan: > [https://github.com/apache/spark/blob/v3.4.1/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala#L194] > intended to fill the empty partition for 'pushdown-vallue' will still iterate > through non-grouped partition and lookup from grouped partition to fill the > map, resulting in some duplicate input data fed into the join. -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org