[jira] [Created] (SPARK-48421) SPJ: Add documentation

2024-05-25 Thread Szehon Ho (Jira)
Szehon Ho created SPARK-48421:
-

 Summary: SPJ: Add documentation
 Key: SPARK-48421
 URL: https://issues.apache.org/jira/browse/SPARK-48421
 Project: Spark
  Issue Type: Documentation
  Components: SQL
Affects Versions: 4.0.0
Reporter: Szehon Ho


As part of SPARK-48329, we mentioned "Storage Partition Join" but noticed there 
is no documentation describing the same.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-48329) Default spark.sql.sources.v2.bucketing.pushPartValues.enabled to true

2024-05-20 Thread Szehon Ho (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-48329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17847963#comment-17847963
 ] 

Szehon Ho edited comment on SPARK-48329 at 5/20/24 10:37 PM:
-

I cherry picked your doc change to my pr.  I think it should be work to also 
add you as co-author, is it ok?


was (Author: szehon):
I cherry picked your doc change to my pr to be co-author, is it ok?

> Default spark.sql.sources.v2.bucketing.pushPartValues.enabled to true
> -
>
> Key: SPARK-48329
> URL: https://issues.apache.org/jira/browse/SPARK-48329
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 4.0.0
>Reporter: Szehon Ho
>Priority: Minor
>  Labels: pull-request-available
>
> The SPJ feature flag 'spark.sql.sources.v2.bucketing.pushPartValues.enabled' 
> has proven valuable for most use cases.  We should take advantage of 4.0 
> release and change the value to true.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-48329) Default spark.sql.sources.v2.bucketing.pushPartValues.enabled to true

2024-05-20 Thread Szehon Ho (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-48329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17847963#comment-17847963
 ] 

Szehon Ho commented on SPARK-48329:
---

I cherry picked your doc change to my pr to be co-author, is it ok?

> Default spark.sql.sources.v2.bucketing.pushPartValues.enabled to true
> -
>
> Key: SPARK-48329
> URL: https://issues.apache.org/jira/browse/SPARK-48329
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 4.0.0
>Reporter: Szehon Ho
>Priority: Minor
>  Labels: pull-request-available
>
> The SPJ feature flag 'spark.sql.sources.v2.bucketing.pushPartValues.enabled' 
> has proven valuable for most use cases.  We should take advantage of 4.0 
> release and change the value to true.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-48329) Default spark.sql.sources.v2.bucketing.pushPartValues.enabled to true

2024-05-20 Thread Szehon Ho (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-48329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17847962#comment-17847962
 ] 

Szehon Ho edited comment on SPARK-48329 at 5/20/24 6:58 PM:


Oh sorry I just saw this and also made a pr, I was going to to make this pr 
earlier but was waiting on internal review.  I think yours works too, but I 
also made some simplification in the test suites.  Maybe we can be co-author 
here.


was (Author: szehon):
Oh sorry I just saw this, I was about to make this pr but was waiting on 
internal review.  I think yours works too, but I also made some simplification 
in the test suites.  Maybe we can be co-author here.

> Default spark.sql.sources.v2.bucketing.pushPartValues.enabled to true
> -
>
> Key: SPARK-48329
> URL: https://issues.apache.org/jira/browse/SPARK-48329
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 4.0.0
>Reporter: Szehon Ho
>Priority: Minor
>  Labels: pull-request-available
>
> The SPJ feature flag 'spark.sql.sources.v2.bucketing.pushPartValues.enabled' 
> has proven valuable for most use cases.  We should take advantage of 4.0 
> release and change the value to true.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-48329) Default spark.sql.sources.v2.bucketing.pushPartValues.enabled to true

2024-05-20 Thread Szehon Ho (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-48329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17847962#comment-17847962
 ] 

Szehon Ho commented on SPARK-48329:
---

Oh sorry I just saw this, I was about to make this pr but was waiting on 
internal review.  I think yours works too, but I also made some simplification 
in the test suites.  Maybe we can be co-author here.

> Default spark.sql.sources.v2.bucketing.pushPartValues.enabled to true
> -
>
> Key: SPARK-48329
> URL: https://issues.apache.org/jira/browse/SPARK-48329
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 4.0.0
>Reporter: Szehon Ho
>Priority: Minor
>  Labels: pull-request-available
>
> The SPJ feature flag 'spark.sql.sources.v2.bucketing.pushPartValues.enabled' 
> has proven valuable for most use cases.  We should take advantage of 4.0 
> release and change the value to true.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-48329) Default spark.sql.sources.v2.bucketing.pushPartValues.enabled to true

2024-05-17 Thread Szehon Ho (Jira)
Szehon Ho created SPARK-48329:
-

 Summary: Default 
spark.sql.sources.v2.bucketing.pushPartValues.enabled to true
 Key: SPARK-48329
 URL: https://issues.apache.org/jira/browse/SPARK-48329
 Project: Spark
  Issue Type: Sub-task
  Components: SQL
Affects Versions: 4.0.0
Reporter: Szehon Ho


The SPJ feature flag 'spark.sql.sources.v2.bucketing.pushPartValues.enabled' 
has proven valuable for most use cases.  We should take advantage of 4.0 
release and change the value to true.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-48065) SPJ: allowJoinKeysSubsetOfPartitionKeys is too strict

2024-04-30 Thread Szehon Ho (Jira)
Szehon Ho created SPARK-48065:
-

 Summary: SPJ: allowJoinKeysSubsetOfPartitionKeys is too strict
 Key: SPARK-48065
 URL: https://issues.apache.org/jira/browse/SPARK-48065
 Project: Spark
  Issue Type: Sub-task
  Components: SQL
Affects Versions: 3.4.3
Reporter: Szehon Ho
 Fix For: 4.0.0


If spark.sql.v2.bucketing.allowJoinKeysSubsetOfPartitionKeys.enabled is true, 
then SPJ no longer triggers if there are more join keys than partition keys.  
It is triggered only if join keys is equal to , or less than, partition keys.

 

We can relax this constraint, as this case was supported if the flag is not 
enabled.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-48012) SPJ: Support Transfrom Expressions for One Side Shuffle

2024-04-26 Thread Szehon Ho (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-48012?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Szehon Ho updated SPARK-48012:
--
Parent: SPARK-37375
Issue Type: Sub-task  (was: New Feature)

> SPJ: Support Transfrom Expressions for One Side Shuffle
> ---
>
> Key: SPARK-48012
> URL: https://issues.apache.org/jira/browse/SPARK-48012
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 3.4.3
>Reporter: Szehon Ho
>Priority: Major
>
> SPARK-41471 allowed Spark to shuffle just one side and still conduct SPJ, if 
> the other side is KeyGroupedPartitioning.  However, the support was just for 
> a KeyGroupedPartition without any partition transform (day, year, bucket).  
> It will be useful to add support for partition transform as well, as there 
> are many tables partitioned by those transforms.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-48012) SPJ: Support Transfrom Expressions for One Side Shuffle

2024-04-26 Thread Szehon Ho (Jira)
Szehon Ho created SPARK-48012:
-

 Summary: SPJ: Support Transfrom Expressions for One Side Shuffle
 Key: SPARK-48012
 URL: https://issues.apache.org/jira/browse/SPARK-48012
 Project: Spark
  Issue Type: New Feature
  Components: SQL
Affects Versions: 3.4.3
Reporter: Szehon Ho


SPARK-41471 allowed Spark to shuffle just one side and still conduct SPJ, if 
the other side is KeyGroupedPartitioning.  However, the support was just for a 
KeyGroupedPartition without any partition transform (day, year, bucket).  It 
will be useful to add support for partition transform as well, as there are 
many tables partitioned by those transforms.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-47094) SPJ : Dynamically rebalance number of buckets when they are not equal

2024-02-26 Thread Szehon Ho (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-47094?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Szehon Ho updated SPARK-47094:
--
Parent: SPARK-37375
Issue Type: Sub-task  (was: New Feature)

> SPJ : Dynamically rebalance number of buckets when they are not equal
> -
>
> Key: SPARK-47094
> URL: https://issues.apache.org/jira/browse/SPARK-47094
> Project: Spark
>  Issue Type: Sub-task
>  Components: Spark Core
>Affects Versions: 3.3.0, 3.4.0
>Reporter: Himadri Pal
>Priority: Major
>  Labels: pull-request-available
>
> SPJ: Storage Partition Join works with Iceberg tables when both the tables 
> have same number of buckets. As part of this feature request, we would like 
> spark to gather the number of buckets information from both the tables and 
> dynamically rebalance the number of buckets by coalesce or repartition so 
> that SPJ will work fine. In this case, we would still have to shuffle but 
> would be better than no SPJ.
> Use Case : 
> Many times we do not have control of the input tables, hence it's not 
> possible to change partitioning scheme on those tables. As a consumer, we 
> would still like them to be used with SPJ when used with other tables and 
> output tables which has different number of buckets.
> In these scenario, we would need to read those tables rewrite them with 
> matching number of buckets for the SPJ to work, this extra step could 
> outweigh the benefits of less shuffle via SPJ. Also when there are multiple 
> different tables being joined, each tables need to be rewritten with matching 
> number of buckets. 
> If this feature is implemented, SPJ functionality will be more powerful.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-44647) Support SPJ when join key is subset of partition keys

2023-08-05 Thread Szehon Ho (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44647?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Szehon Ho updated SPARK-44647:
--
Parent: SPARK-37375
Issue Type: Sub-task  (was: New Feature)

> Support SPJ when join key is subset of partition keys
> -
>
> Key: SPARK-44647
> URL: https://issues.apache.org/jira/browse/SPARK-44647
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 3.5.0
>Reporter: Szehon Ho
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-44641) Results duplicated when SPJ partial-cluster and pushdown enabled but conditions unmet

2023-08-02 Thread Szehon Ho (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44641?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Szehon Ho updated SPARK-44641:
--
Description: 
Adding the following test case in KeyGroupedPartitionSuite demonstrates the 
problem.

 
{code:java}
test("test join key is the second partition key and a transform") {
  val items_partitions = Array(bucket(8, "id"), days("arrive_time"))
  createTable(items, items_schema, items_partitions)
  sql(s"INSERT INTO testcat.ns.$items VALUES " +
s"(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " +
s"(1, 'aa', 41.0, cast('2020-01-15' as timestamp)), " +
s"(2, 'bb', 10.0, cast('2020-01-01' as timestamp)), " +
s"(2, 'bb', 10.5, cast('2020-01-01' as timestamp)), " +
s"(3, 'cc', 15.5, cast('2020-02-01' as timestamp))")

  val purchases_partitions = Array(bucket(8, "item_id"), days("time"))
  createTable(purchases, purchases_schema, purchases_partitions)
  sql(s"INSERT INTO testcat.ns.$purchases VALUES " +
s"(1, 42.0, cast('2020-01-01' as timestamp)), " +
s"(1, 44.0, cast('2020-01-15' as timestamp)), " +
s"(1, 45.0, cast('2020-01-15' as timestamp)), " +
s"(2, 11.0, cast('2020-01-01' as timestamp)), " +
s"(3, 19.5, cast('2020-02-01' as timestamp))")

  withSQLConf(
SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION.key -> "false",
SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> "true",
SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key ->
  "true") {
val df = sql("SELECT id, name, i.price as purchase_price, " +
  "p.item_id, p.price as sale_price " +
  s"FROM testcat.ns.$items i JOIN testcat.ns.$purchases p " +
  "ON i.arrive_time = p.time " +
  "ORDER BY id, purchase_price, p.item_id, sale_price")

val shuffles = collectShuffles(df.queryExecution.executedPlan)
assert(!shuffles.isEmpty, "should not perform SPJ as not all join keys are 
partition keys")
checkAnswer(df,
  Seq(
Row(1, "aa", 40.0, 1, 42.0),
Row(1, "aa", 40.0, 2, 11.0),
Row(1, "aa", 41.0, 1, 44.0),
Row(1, "aa", 41.0, 1, 45.0),
Row(2, "bb", 10.0, 1, 42.0),
Row(2, "bb", 10.0, 2, 11.0),
Row(2, "bb", 10.5, 1, 42.0),
Row(2, "bb", 10.5, 2, 11.0),
Row(3, "cc", 15.5, 3, 19.5)
  )
)
  }
}{code}
 

Note: this tests has setup the datasourceV2 to return multiple splits for same 
partition.

In this case, SPJ is not triggered (because join key does not match partition 
key), but the following code in DSV2Scan:

[https://github.com/apache/spark/blob/v3.4.1/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala#L194]

intended to fill the empty partition for 'pushdown-vallue' will still iterate 
through non-grouped partition and lookup from grouped partition to fill the 
map, resulting in some duplicate input data fed into the join.

  was:
Adding the following test case in KeyGroupedPartitionSuite demonstrates the 
problem.

 
{code:java}
test("test join key is the second partition key and a transform") {
  val items_partitions = Array(bucket(8, "id"), days("arrive_time"))
  createTable(items, items_schema, items_partitions)
  sql(s"INSERT INTO testcat.ns.$items VALUES " +
s"(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " +
s"(1, 'aa', 41.0, cast('2020-01-15' as timestamp)), " +
s"(2, 'bb', 10.0, cast('2020-01-01' as timestamp)), " +
s"(2, 'bb', 10.5, cast('2020-01-01' as timestamp)), " +
s"(3, 'cc', 15.5, cast('2020-02-01' as timestamp))")

  val purchases_partitions = Array(bucket(8, "item_id"), days("time"))
  createTable(purchases, purchases_schema, purchases_partitions)
  sql(s"INSERT INTO testcat.ns.$purchases VALUES " +
s"(1, 42.0, cast('2020-01-01' as timestamp)), " +
s"(1, 44.0, cast('2020-01-15' as timestamp)), " +
s"(1, 45.0, cast('2020-01-15' as timestamp)), " +
s"(2, 11.0, cast('2020-01-01' as timestamp)), " +
s"(3, 19.5, cast('2020-02-01' as timestamp))")

  withSQLConf(
SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION.key -> "false",
SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> "true",
SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key ->
  "true") {
val df = sql("SELECT id, name, i.price as purchase_price, " +
  "p.item_id, p.price as sale_price " +
  s"FROM testcat.ns.$items i JOIN testcat.ns.$purchases p " +
  "ON i.arrive_time = p.time " +
  "ORDER BY id, purchase_price, p.item_id, sale_price")

val shuffles = collectShuffles(df.queryExecution.executedPlan)
assert(!shuffles.isEmpty, "should not perform SPJ as not all join keys are 
partition keys")
checkAnswer(df,
  Seq(
Row(1, "aa", 40.0, 1, 42.0),
Row(1, "aa", 40.0, 2, 11.0),
Row(1, "aa", 41.0, 1, 44.0),
Row(1, "aa", 41.0, 1, 45.0),
Row(2, "bb", 10.0, 1, 42.0),
Row(2, "bb", 10.0, 2, 11.0),
Row(2, 

[jira] [Created] (SPARK-44647) Support SPJ when join key is subset of partition keys

2023-08-02 Thread Szehon Ho (Jira)
Szehon Ho created SPARK-44647:
-

 Summary: Support SPJ when join key is subset of partition keys
 Key: SPARK-44647
 URL: https://issues.apache.org/jira/browse/SPARK-44647
 Project: Spark
  Issue Type: New Feature
  Components: SQL
Affects Versions: 3.5.0
Reporter: Szehon Ho






--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-44641) Results duplicated when SPJ partial-cluster and pushdown enabled but conditions unmet

2023-08-02 Thread Szehon Ho (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44641?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Szehon Ho updated SPARK-44641:
--
Description: 
Adding the following test case in KeyGroupedPartitionSuite demonstrates the 
problem.

 
{code:java}
test("test join key is the second partition key and a transform") {
  val items_partitions = Array(bucket(8, "id"), days("arrive_time"))
  createTable(items, items_schema, items_partitions)
  sql(s"INSERT INTO testcat.ns.$items VALUES " +
s"(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " +
s"(1, 'aa', 41.0, cast('2020-01-15' as timestamp)), " +
s"(2, 'bb', 10.0, cast('2020-01-01' as timestamp)), " +
s"(2, 'bb', 10.5, cast('2020-01-01' as timestamp)), " +
s"(3, 'cc', 15.5, cast('2020-02-01' as timestamp))")

  val purchases_partitions = Array(bucket(8, "item_id"), days("time"))
  createTable(purchases, purchases_schema, purchases_partitions)
  sql(s"INSERT INTO testcat.ns.$purchases VALUES " +
s"(1, 42.0, cast('2020-01-01' as timestamp)), " +
s"(1, 44.0, cast('2020-01-15' as timestamp)), " +
s"(1, 45.0, cast('2020-01-15' as timestamp)), " +
s"(2, 11.0, cast('2020-01-01' as timestamp)), " +
s"(3, 19.5, cast('2020-02-01' as timestamp))")

  withSQLConf(
SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION.key -> "false",
SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> "true",
SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key ->
  "true") {
val df = sql("SELECT id, name, i.price as purchase_price, " +
  "p.item_id, p.price as sale_price " +
  s"FROM testcat.ns.$items i JOIN testcat.ns.$purchases p " +
  "ON i.arrive_time = p.time " +
  "ORDER BY id, purchase_price, p.item_id, sale_price")

val shuffles = collectShuffles(df.queryExecution.executedPlan)
assert(!shuffles.isEmpty, "should not perform SPJ as not all join keys are 
partition keys")
checkAnswer(df,
  Seq(
Row(1, "aa", 40.0, 1, 42.0),
Row(1, "aa", 40.0, 2, 11.0),
Row(1, "aa", 41.0, 1, 44.0),
Row(1, "aa", 41.0, 1, 45.0),
Row(2, "bb", 10.0, 1, 42.0),
Row(2, "bb", 10.0, 2, 11.0),
Row(2, "bb", 10.5, 1, 42.0),
Row(2, "bb", 10.5, 2, 11.0),
Row(3, "cc", 15.5, 3, 19.5)
  )
)
  }
}{code}
 

This tests the case the datasourceV2 returns multiple splits for same partition.

In this case, SPJ is not triggered (because join key does not match partition 
key), but the following code in DSV2Scan:

[https://github.com/apache/spark/blob/v3.4.1/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala#L194]

intended to fill the empty partition for 'pushdown-vallue' will still iterate 
through non-grouped partition and lookup from grouped partition to fill the 
map, resulting in some duplicate input data fed into the join.

  was:
Adding the following test case in KeyGroupedPartitionSuite demonstrates the 
problem.

 
{code:java}
test("test join key is the second partition key and a transform") {
  val items_partitions = Array(bucket(8, "id"), days("arrive_time"))
  createTable(items, items_schema, items_partitions)
  sql(s"INSERT INTO testcat.ns.$items VALUES " +
s"(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " +
s"(1, 'aa', 41.0, cast('2020-01-15' as timestamp)), " +
s"(2, 'bb', 10.0, cast('2020-01-01' as timestamp)), " +
s"(2, 'bb', 10.5, cast('2020-01-01' as timestamp)), " +
s"(3, 'cc', 15.5, cast('2020-02-01' as timestamp))")

  val purchases_partitions = Array(bucket(8, "item_id"), days("time"))
  createTable(purchases, purchases_schema, purchases_partitions)
  sql(s"INSERT INTO testcat.ns.$purchases VALUES " +
s"(1, 42.0, cast('2020-01-01' as timestamp)), " +
s"(1, 44.0, cast('2020-01-15' as timestamp)), " +
s"(1, 45.0, cast('2020-01-15' as timestamp)), " +
s"(2, 11.0, cast('2020-01-01' as timestamp)), " +
s"(3, 19.5, cast('2020-02-01' as timestamp))")

  withSQLConf(
SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION.key -> "false",
SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> "true",
SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key ->
  "true") {
val df = sql("SELECT id, name, i.price as purchase_price, " +
  "p.item_id, p.price as sale_price " +
  s"FROM testcat.ns.$items i JOIN testcat.ns.$purchases p " +
  "ON i.arrive_time = p.time " +
  "ORDER BY id, purchase_price, p.item_id, sale_price")

val shuffles = collectShuffles(df.queryExecution.executedPlan)
assert(!shuffles.isEmpty, "should not perform SPJ as not all join keys are 
partition keys")
checkAnswer(df,
  Seq(
Row(1, "aa", 40.0, 1, 42.0),
Row(1, "aa", 40.0, 2, 11.0),
Row(1, "aa", 41.0, 1, 44.0),
Row(1, "aa", 41.0, 1, 45.0),
Row(2, "bb", 10.0, 1, 42.0),
Row(2, "bb", 10.0, 2, 11.0),
Row(2, "bb", 10.5, 

[jira] [Created] (SPARK-44641) Results duplicated when SPJ partial-cluster and pushdown enabled but conditions unmet

2023-08-02 Thread Szehon Ho (Jira)
Szehon Ho created SPARK-44641:
-

 Summary: Results duplicated when SPJ partial-cluster and pushdown 
enabled but conditions unmet
 Key: SPARK-44641
 URL: https://issues.apache.org/jira/browse/SPARK-44641
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 3.4.1
Reporter: Szehon Ho


Adding the following test case in KeyGroupedPartitionSuite demonstrates the 
problem.

 
{code:java}
test("test join key is the second partition key and a transform") {
  val items_partitions = Array(bucket(8, "id"), days("arrive_time"))
  createTable(items, items_schema, items_partitions)
  sql(s"INSERT INTO testcat.ns.$items VALUES " +
s"(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " +
s"(1, 'aa', 41.0, cast('2020-01-15' as timestamp)), " +
s"(2, 'bb', 10.0, cast('2020-01-01' as timestamp)), " +
s"(2, 'bb', 10.5, cast('2020-01-01' as timestamp)), " +
s"(3, 'cc', 15.5, cast('2020-02-01' as timestamp))")

  val purchases_partitions = Array(bucket(8, "item_id"), days("time"))
  createTable(purchases, purchases_schema, purchases_partitions)
  sql(s"INSERT INTO testcat.ns.$purchases VALUES " +
s"(1, 42.0, cast('2020-01-01' as timestamp)), " +
s"(1, 44.0, cast('2020-01-15' as timestamp)), " +
s"(1, 45.0, cast('2020-01-15' as timestamp)), " +
s"(2, 11.0, cast('2020-01-01' as timestamp)), " +
s"(3, 19.5, cast('2020-02-01' as timestamp))")

  withSQLConf(
SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION.key -> "false",
SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> "true",
SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key ->
  "true") {
val df = sql("SELECT id, name, i.price as purchase_price, " +
  "p.item_id, p.price as sale_price " +
  s"FROM testcat.ns.$items i JOIN testcat.ns.$purchases p " +
  "ON i.arrive_time = p.time " +
  "ORDER BY id, purchase_price, p.item_id, sale_price")

val shuffles = collectShuffles(df.queryExecution.executedPlan)
assert(!shuffles.isEmpty, "should not perform SPJ as not all join keys are 
partition keys")
checkAnswer(df,
  Seq(
Row(1, "aa", 40.0, 1, 42.0),
Row(1, "aa", 40.0, 2, 11.0),
Row(1, "aa", 41.0, 1, 44.0),
Row(1, "aa", 41.0, 1, 45.0),
Row(2, "bb", 10.0, 1, 42.0),
Row(2, "bb", 10.0, 2, 11.0),
Row(2, "bb", 10.5, 1, 42.0),
Row(2, "bb", 10.5, 2, 11.0),
Row(3, "cc", 15.5, 3, 19.5)
  )
)
  }
}{code}
 

This tests the case the datasourceV2 returns multiple splits for same partition.

In this case, SPJ is not triggered, but the following code in DSV2Scan:

https://github.com/apache/spark/blob/v3.4.1/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala#L194

intended to fill the empty partition for 'pushdown-vallue' will still iterate 
through non-grouped partition and lookup from grouped partition to fill the 
map, resulting in some duplicate input data fed into the join.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-44060) Code-gen for build side outer shuffled hash join

2023-06-14 Thread Szehon Ho (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44060?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Szehon Ho updated SPARK-44060:
--
Description: 
Here, build side outer join means LEFT OUTER join with build left, or RIGHT 
OUTER join with build right.

As a followup for https://github.com/apache/spark/pull/41398 (non-codegen 
build-side outer shuffled hash join), this task is to add code-gen for it.

> Code-gen for build side outer shuffled hash join
> 
>
> Key: SPARK-44060
> URL: https://issues.apache.org/jira/browse/SPARK-44060
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 3.5.0
>Reporter: Szehon Ho
>Priority: Major
>
> Here, build side outer join means LEFT OUTER join with build left, or RIGHT 
> OUTER join with build right.
> As a followup for https://github.com/apache/spark/pull/41398 (non-codegen 
> build-side outer shuffled hash join), this task is to add code-gen for it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-44060) Code-gen for build side outer shuffled hash join

2023-06-14 Thread Szehon Ho (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44060?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Szehon Ho updated SPARK-44060:
--
Description: 
Here, build side outer join means LEFT OUTER join with build left, or RIGHT 
OUTER join with build right.

As a followup for https://github.com/apache/spark/pull/41398/ SPARK-36612 
(non-codegen build-side outer shuffled hash join), this task is to add code-gen 
for it.

  was:
Here, build side outer join means LEFT OUTER join with build left, or RIGHT 
OUTER join with build right.

As a followup for https://github.com/apache/spark/pull/41398 (non-codegen 
build-side outer shuffled hash join), this task is to add code-gen for it.


> Code-gen for build side outer shuffled hash join
> 
>
> Key: SPARK-44060
> URL: https://issues.apache.org/jira/browse/SPARK-44060
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 3.5.0
>Reporter: Szehon Ho
>Priority: Major
>
> Here, build side outer join means LEFT OUTER join with build left, or RIGHT 
> OUTER join with build right.
> As a followup for https://github.com/apache/spark/pull/41398/ SPARK-36612 
> (non-codegen build-side outer shuffled hash join), this task is to add 
> code-gen for it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-44060) Code-gen for build side outer shuffled hash join

2023-06-14 Thread Szehon Ho (Jira)
Szehon Ho created SPARK-44060:
-

 Summary: Code-gen for build side outer shuffled hash join
 Key: SPARK-44060
 URL: https://issues.apache.org/jira/browse/SPARK-44060
 Project: Spark
  Issue Type: Sub-task
  Components: SQL
Affects Versions: 3.5.0
Reporter: Szehon Ho






--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-3186) Enable parallelism for Reduce Side Join [Spark Branch]

2014-08-22 Thread Szehon Ho (JIRA)
Szehon Ho created SPARK-3186:


 Summary: Enable parallelism for Reduce Side Join [Spark Branch] 
 Key: SPARK-3186
 URL: https://issues.apache.org/jira/browse/SPARK-3186
 Project: Spark
  Issue Type: Bug
Reporter: Szehon Ho


Blocked by SPARK-2978.  See parent JIRA for design details.



--
This message was sent by Atlassian JIRA
(v6.2#6252)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-3186) Enable parallelism for Reduce Side Join [Spark Branch]

2014-08-22 Thread Szehon Ho (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-3186?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Szehon Ho updated SPARK-3186:
-

Description: (was: Blocked by SPARK-2978.  See parent JIRA for design 
details.)

 Enable parallelism for Reduce Side Join [Spark Branch] 
 ---

 Key: SPARK-3186
 URL: https://issues.apache.org/jira/browse/SPARK-3186
 Project: Spark
  Issue Type: Bug
Reporter: Szehon Ho





--
This message was sent by Atlassian JIRA
(v6.2#6252)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-3186) Enable parallelism for Reduce Side Join [Spark Branch]

2014-08-22 Thread Szehon Ho (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-3186?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Szehon Ho resolved SPARK-3186.
--

Resolution: Invalid

Sorry please ignore, meant to file this in Hive project.

 Enable parallelism for Reduce Side Join [Spark Branch] 
 ---

 Key: SPARK-3186
 URL: https://issues.apache.org/jira/browse/SPARK-3186
 Project: Spark
  Issue Type: Bug
Reporter: Szehon Ho

 Blocked by SPARK-2978.  See parent JIRA for design details.



--
This message was sent by Atlassian JIRA
(v6.2#6252)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Closed] (SPARK-3186) Enable parallelism for Reduce Side Join [Spark Branch]

2014-08-22 Thread Szehon Ho (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-3186?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Szehon Ho closed SPARK-3186.



 Enable parallelism for Reduce Side Join [Spark Branch] 
 ---

 Key: SPARK-3186
 URL: https://issues.apache.org/jira/browse/SPARK-3186
 Project: Spark
  Issue Type: Bug
Reporter: Szehon Ho





--
This message was sent by Atlassian JIRA
(v6.2#6252)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org