[jira] [Updated] (SPARK-45282) Join loses records for cached datasets

2023-11-09 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-45282?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated SPARK-45282:
---
Labels: CorrectnessBug correctness pull-request-available  (was: 
CorrectnessBug correctness)

> Join loses records for cached datasets
> --
>
> Key: SPARK-45282
> URL: https://issues.apache.org/jira/browse/SPARK-45282
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.4.1, 3.5.0
> Environment: spark 3.4.1 on apache hadoop 3.3.6 or kubernetes 1.26 or 
> databricks 13.3
>Reporter: koert kuipers
>Priority: Blocker
>  Labels: CorrectnessBug, correctness, pull-request-available
>
> we observed this issue on spark 3.4.1 but it is also present on 3.5.0. it is 
> not present on spark 3.3.1.
> it only shows up in distributed environment. i cannot replicate in unit test. 
> however i did get it to show up on hadoop cluster, kubernetes, and on 
> databricks 13.3
> the issue is that records are dropped when two cached dataframes are joined. 
> it seems in spark 3.4.1 in queryplan some Exchanges are dropped as an 
> optimization while in spark 3.3.1 these Exhanges are still present. it seems 
> to be an issue with AQE with canChangeCachedPlanOutputPartitioning=true.
> to reproduce on distributed cluster these settings needed are:
> {code:java}
> spark.sql.adaptive.advisoryPartitionSizeInBytes 33554432
> spark.sql.adaptive.coalescePartitions.parallelismFirst false
> spark.sql.adaptive.enabled true
> spark.sql.optimizer.canChangeCachedPlanOutputPartitioning true {code}
> code using scala to reproduce is:
> {code:java}
> import java.util.UUID
> import org.apache.spark.sql.functions.col
> import spark.implicits._
> val data = (1 to 100).toDS().map(i => 
> UUID.randomUUID().toString).persist()
> val left = data.map(k => (k, 1))
> val right = data.map(k => (k, k)) // if i change this to k => (k, 1) it works!
> println("number of left " + left.count())
> println("number of right " + right.count())
> println("number of (left join right) " +
>   left.toDF("key", "value1").join(right.toDF("key", "value2"), "key").count()
> )
> val left1 = left
>   .toDF("key", "value1")
>   .repartition(col("key")) // comment out this line to make it work
>   .persist()
> println("number of left1 " + left1.count())
> val right1 = right
>   .toDF("key", "value2")
>   .repartition(col("key")) // comment out this line to make it work
>   .persist()
> println("number of right1 " + right1.count())
> println("number of (left1 join right1) " +  left1.join(right1, 
> "key").count()) // this gives incorrect result{code}
> this produces the following output:
> {code:java}
> number of left 100
> number of right 100
> number of (left join right) 100
> number of left1 100
> number of right1 100
> number of (left1 join right1) 859531 {code}
> note that the last number (the incorrect one) actually varies depending on 
> settings and cluster size etc.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-45282) Join loses records for cached datasets

2023-10-31 Thread Dongjoon Hyun (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-45282?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun updated SPARK-45282:
--
Target Version/s: 3.4.2, 3.5.1  (was: 3.4.2)

> Join loses records for cached datasets
> --
>
> Key: SPARK-45282
> URL: https://issues.apache.org/jira/browse/SPARK-45282
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.4.1, 3.5.0
> Environment: spark 3.4.1 on apache hadoop 3.3.6 or kubernetes 1.26 or 
> databricks 13.3
>Reporter: koert kuipers
>Priority: Blocker
>  Labels: CorrectnessBug, correctness
>
> we observed this issue on spark 3.4.1 but it is also present on 3.5.0. it is 
> not present on spark 3.3.1.
> it only shows up in distributed environment. i cannot replicate in unit test. 
> however i did get it to show up on hadoop cluster, kubernetes, and on 
> databricks 13.3
> the issue is that records are dropped when two cached dataframes are joined. 
> it seems in spark 3.4.1 in queryplan some Exchanges are dropped as an 
> optimization while in spark 3.3.1 these Exhanges are still present. it seems 
> to be an issue with AQE with canChangeCachedPlanOutputPartitioning=true.
> to reproduce on distributed cluster these settings needed are:
> {code:java}
> spark.sql.adaptive.advisoryPartitionSizeInBytes 33554432
> spark.sql.adaptive.coalescePartitions.parallelismFirst false
> spark.sql.adaptive.enabled true
> spark.sql.optimizer.canChangeCachedPlanOutputPartitioning true {code}
> code using scala to reproduce is:
> {code:java}
> import java.util.UUID
> import org.apache.spark.sql.functions.col
> import spark.implicits._
> val data = (1 to 100).toDS().map(i => 
> UUID.randomUUID().toString).persist()
> val left = data.map(k => (k, 1))
> val right = data.map(k => (k, k)) // if i change this to k => (k, 1) it works!
> println("number of left " + left.count())
> println("number of right " + right.count())
> println("number of (left join right) " +
>   left.toDF("key", "value1").join(right.toDF("key", "value2"), "key").count()
> )
> val left1 = left
>   .toDF("key", "value1")
>   .repartition(col("key")) // comment out this line to make it work
>   .persist()
> println("number of left1 " + left1.count())
> val right1 = right
>   .toDF("key", "value2")
>   .repartition(col("key")) // comment out this line to make it work
>   .persist()
> println("number of right1 " + right1.count())
> println("number of (left1 join right1) " +  left1.join(right1, 
> "key").count()) // this gives incorrect result{code}
> this produces the following output:
> {code:java}
> number of left 100
> number of right 100
> number of (left join right) 100
> number of left1 100
> number of right1 100
> number of (left1 join right1) 859531 {code}
> note that the last number (the incorrect one) actually varies depending on 
> settings and cluster size etc.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-45282) Join loses records for cached datasets

2023-10-30 Thread Dongjoon Hyun (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-45282?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun updated SPARK-45282:
--
Priority: Blocker  (was: Major)

> Join loses records for cached datasets
> --
>
> Key: SPARK-45282
> URL: https://issues.apache.org/jira/browse/SPARK-45282
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.4.1, 3.5.0
> Environment: spark 3.4.1 on apache hadoop 3.3.6 or kubernetes 1.26 or 
> databricks 13.3
>Reporter: koert kuipers
>Priority: Blocker
>  Labels: CorrectnessBug, correctness
>
> we observed this issue on spark 3.4.1 but it is also present on 3.5.0. it is 
> not present on spark 3.3.1.
> it only shows up in distributed environment. i cannot replicate in unit test. 
> however i did get it to show up on hadoop cluster, kubernetes, and on 
> databricks 13.3
> the issue is that records are dropped when two cached dataframes are joined. 
> it seems in spark 3.4.1 in queryplan some Exchanges are dropped as an 
> optimization while in spark 3.3.1 these Exhanges are still present. it seems 
> to be an issue with AQE with canChangeCachedPlanOutputPartitioning=true.
> to reproduce on distributed cluster these settings needed are:
> {code:java}
> spark.sql.adaptive.advisoryPartitionSizeInBytes 33554432
> spark.sql.adaptive.coalescePartitions.parallelismFirst false
> spark.sql.adaptive.enabled true
> spark.sql.optimizer.canChangeCachedPlanOutputPartitioning true {code}
> code using scala to reproduce is:
> {code:java}
> import java.util.UUID
> import org.apache.spark.sql.functions.col
> import spark.implicits._
> val data = (1 to 100).toDS().map(i => 
> UUID.randomUUID().toString).persist()
> val left = data.map(k => (k, 1))
> val right = data.map(k => (k, k)) // if i change this to k => (k, 1) it works!
> println("number of left " + left.count())
> println("number of right " + right.count())
> println("number of (left join right) " +
>   left.toDF("key", "value1").join(right.toDF("key", "value2"), "key").count()
> )
> val left1 = left
>   .toDF("key", "value1")
>   .repartition(col("key")) // comment out this line to make it work
>   .persist()
> println("number of left1 " + left1.count())
> val right1 = right
>   .toDF("key", "value2")
>   .repartition(col("key")) // comment out this line to make it work
>   .persist()
> println("number of right1 " + right1.count())
> println("number of (left1 join right1) " +  left1.join(right1, 
> "key").count()) // this gives incorrect result{code}
> this produces the following output:
> {code:java}
> number of left 100
> number of right 100
> number of (left join right) 100
> number of left1 100
> number of right1 100
> number of (left1 join right1) 859531 {code}
> note that the last number (the incorrect one) actually varies depending on 
> settings and cluster size etc.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-45282) Join loses records for cached datasets

2023-10-30 Thread Dongjoon Hyun (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-45282?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun updated SPARK-45282:
--
Target Version/s: 3.4.2

> Join loses records for cached datasets
> --
>
> Key: SPARK-45282
> URL: https://issues.apache.org/jira/browse/SPARK-45282
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.4.1, 3.5.0
> Environment: spark 3.4.1 on apache hadoop 3.3.6 or kubernetes 1.26 or 
> databricks 13.3
>Reporter: koert kuipers
>Priority: Blocker
>  Labels: CorrectnessBug, correctness
>
> we observed this issue on spark 3.4.1 but it is also present on 3.5.0. it is 
> not present on spark 3.3.1.
> it only shows up in distributed environment. i cannot replicate in unit test. 
> however i did get it to show up on hadoop cluster, kubernetes, and on 
> databricks 13.3
> the issue is that records are dropped when two cached dataframes are joined. 
> it seems in spark 3.4.1 in queryplan some Exchanges are dropped as an 
> optimization while in spark 3.3.1 these Exhanges are still present. it seems 
> to be an issue with AQE with canChangeCachedPlanOutputPartitioning=true.
> to reproduce on distributed cluster these settings needed are:
> {code:java}
> spark.sql.adaptive.advisoryPartitionSizeInBytes 33554432
> spark.sql.adaptive.coalescePartitions.parallelismFirst false
> spark.sql.adaptive.enabled true
> spark.sql.optimizer.canChangeCachedPlanOutputPartitioning true {code}
> code using scala to reproduce is:
> {code:java}
> import java.util.UUID
> import org.apache.spark.sql.functions.col
> import spark.implicits._
> val data = (1 to 100).toDS().map(i => 
> UUID.randomUUID().toString).persist()
> val left = data.map(k => (k, 1))
> val right = data.map(k => (k, k)) // if i change this to k => (k, 1) it works!
> println("number of left " + left.count())
> println("number of right " + right.count())
> println("number of (left join right) " +
>   left.toDF("key", "value1").join(right.toDF("key", "value2"), "key").count()
> )
> val left1 = left
>   .toDF("key", "value1")
>   .repartition(col("key")) // comment out this line to make it work
>   .persist()
> println("number of left1 " + left1.count())
> val right1 = right
>   .toDF("key", "value2")
>   .repartition(col("key")) // comment out this line to make it work
>   .persist()
> println("number of right1 " + right1.count())
> println("number of (left1 join right1) " +  left1.join(right1, 
> "key").count()) // this gives incorrect result{code}
> this produces the following output:
> {code:java}
> number of left 100
> number of right 100
> number of (left join right) 100
> number of left1 100
> number of right1 100
> number of (left1 join right1) 859531 {code}
> note that the last number (the incorrect one) actually varies depending on 
> settings and cluster size etc.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-45282) Join loses records for cached datasets

2023-09-22 Thread koert kuipers (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-45282?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

koert kuipers updated SPARK-45282:
--
Description: 
we observed this issue on spark 3.4.1 but it is also present on 3.5.0. it is 
not present on spark 3.3.1.

it only shows up in distributed environment. i cannot replicate in unit test. 
however i did get it to show up on hadoop cluster, kubernetes, and on 
databricks 13.3

the issue is that records are dropped when two cached dataframes are joined. it 
seems in spark 3.4.1 in queryplan some Exchanges are dropped as an optimization 
while in spark 3.3.1 these Exhanges are still present. it seems to be an issue 
with AQE with canChangeCachedPlanOutputPartitioning=true.

to reproduce on distributed cluster these settings needed are:
{code:java}
spark.sql.adaptive.advisoryPartitionSizeInBytes 33554432
spark.sql.adaptive.coalescePartitions.parallelismFirst false
spark.sql.adaptive.enabled true
spark.sql.optimizer.canChangeCachedPlanOutputPartitioning true {code}
code using scala to reproduce is:
{code:java}
import java.util.UUID
import org.apache.spark.sql.functions.col

import spark.implicits._

val data = (1 to 100).toDS().map(i => UUID.randomUUID().toString).persist()

val left = data.map(k => (k, 1))
val right = data.map(k => (k, k)) // if i change this to k => (k, 1) it works!
println("number of left " + left.count())
println("number of right " + right.count())
println("number of (left join right) " +
  left.toDF("key", "value1").join(right.toDF("key", "value2"), "key").count()
)

val left1 = left
  .toDF("key", "value1")
  .repartition(col("key")) // comment out this line to make it work
  .persist()
println("number of left1 " + left1.count())

val right1 = right
  .toDF("key", "value2")
  .repartition(col("key")) // comment out this line to make it work
  .persist()
println("number of right1 " + right1.count())

println("number of (left1 join right1) " +  left1.join(right1, "key").count()) 
// this gives incorrect result{code}
this produces the following output:
{code:java}
number of left 100
number of right 100
number of (left join right) 100
number of left1 100
number of right1 100
number of (left1 join right1) 859531 {code}
note that the last number (the incorrect one) actually varies depending on 
settings and cluster size etc.

 

  was:
we observed this issue on spark 3.4.1 but it is also present on 3.5.0. it is 
not present on spark 3.3.1.

it only shows up in distributed environment. i cannot replicate in unit test. 
however i did get it to show up on hadoop cluster, kubernetes, and on 
databricks 13.3

the issue is that records are dropped when two cached dataframes are joined. it 
seems in spark 3.4.1 in queryplan some Exchanges are dropped as an optimization 
while in spark 3.3.1 these Exhanges are still present. it seems to be an issue 
with AQE with canChangeCachedPlanOutputPartitioning=true.

to reproduce on distributed cluster these settings needed are:
{code:java}
spark.sql.adaptive.advisoryPartitionSizeInBytes 33554432
spark.sql.adaptive.coalescePartitions.parallelismFirst false
spark.sql.adaptive.enabled true
spark.sql.optimizer.canChangeCachedPlanOutputPartitioning true {code}
code using scala to reproduce is:
{code:java}
import java.util.UUID
import org.apache.spark.sql.functions.col

import spark.implicits._

val data = (1 to 100).toDS().map(i => UUID.randomUUID().toString).persist()

val left = data.map(k => (k, 1))
val right = data.map(k => (k, k)) // if i change this to k => (k, 1) it works!
println("number of left " + left.count())
println("number of right " + right.count())
println("number of (left join right) " +
  left.toDF("key", "vertex").join(right.toDF("key", "state"), "key").count()
)

val left1 = left
  .toDF("key", "vertex")
  .repartition(col("key")) // comment out this line to make it work
  .persist()
println("number of left1 " + left1.count())

val right1 = right
  .toDF("key", "state")
  .repartition(col("key")) // comment out this line to make it work
  .persist()
println("number of right1 " + right1.count())

println("number of (left1 join right1) " +  left1.join(right1, "key").count()) 
// this gives incorrect result{code}
this produces the following output:
{code:java}
number of left 100
number of right 100
number of (left join right) 100
number of left1 100
number of right1 100
number of (left1 join right1) 859531 {code}
note that the last number (the incorrect one) actually varies depending on 
settings and cluster size etc.

 


> Join loses records for cached datasets
> --
>
> Key: SPARK-45282
> URL: https://issues.apache.org/jira/browse/SPARK-45282
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.4.1, 3.5.0
> Environment: spark 3.4.1 on apache hadoop 3.3.6 or kub

[jira] [Updated] (SPARK-45282) Join loses records for cached datasets

2023-09-22 Thread koert kuipers (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-45282?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

koert kuipers updated SPARK-45282:
--
Description: 
we observed this issue on spark 3.4.1 but it is also present on 3.5.0. it is 
not present on spark 3.3.1.

it only shows up in distributed environment. i cannot replicate in unit test. 
however i did get it to show up on hadoop cluster, kubernetes, and on 
databricks 13.3

the issue is that records are dropped when two cached dataframes are joined. it 
seems in spark 3.4.1 in queryplan some Exchanges are dropped as an optimization 
while in spark 3.3.1 these Exhanges are still present. it seems to be an issue 
with AQE with canChangeCachedPlanOutputPartitioning=true.

to reproduce on distributed cluster these settings needed are:
{code:java}
spark.sql.adaptive.advisoryPartitionSizeInBytes 33554432
spark.sql.adaptive.coalescePartitions.parallelismFirst false
spark.sql.adaptive.enabled true
spark.sql.optimizer.canChangeCachedPlanOutputPartitioning true {code}
code using scala to reproduce is:
{code:java}
import java.util.UUID
import org.apache.spark.sql.functions.col

import spark.implicits._

val data = (1 to 100).toDS().map(i => UUID.randomUUID().toString).persist()

val left = data.map(k => (k, 1))
val right = data.map(k => (k, k)) // if i change this to k => (k, 1) it works!
println("number of left " + left.count())
println("number of right " + right.count())
println("number of (left join right) " +
  left.toDF("key", "vertex").join(right.toDF("key", "state"), "key").count()
)

val left1 = left
  .toDF("key", "vertex")
  .repartition(col("key")) // comment out this line to make it work
  .persist()
println("number of left1 " + left1.count())

val right1 = right
  .toDF("key", "state")
  .repartition(col("key")) // comment out this line to make it work
  .persist()
println("number of right1 " + right1.count())

println("number of (left1 join right1) " +  left1.join(right1, "key").count()) 
// this gives incorrect result{code}
this produces the following output:
{code:java}
number of left 100
number of right 100
number of (left join right) 100
number of left1 100
number of right1 100
number of (left1 join right1) 859531 {code}
note that the last number (the incorrect one) actually varies depending on 
settings and cluster size etc.

 

  was:
we observed this issue on spark 3.4.1 but it is also present on 3.5.0. it is 
not present on spark 3.3.1.

it only shows up in distributed environment. i cannot replicate in unit test. 
however i did get it to show up on hadoop cluster, kubernetes, and on 
databricks 13.3

the issue is that records are dropped when two cached dataframes are joined. it 
seems in spark 3.4.1 in queryplan some Exchanges are dropped as an optimization 
while in spark 3.3.1 these Exhanges are still present. it seems to be an issue 
with AQE with canChangeCachedPlanOutputPartitioning=true.

to reproduce on distributed cluster these settings needed are:
{code:java}
spark.sql.adaptive.advisoryPartitionSizeInBytes 33554432
spark.sql.adaptive.coalescePartitions.parallelismFirst false
spark.sql.adaptive.enabled true
spark.sql.optimizer.canChangeCachedPlanOutputPartitioning true {code}
code using scala 2.13 to reproduce is:
{code:java}
import java.util.UUID
import org.apache.spark.sql.functions.col

import spark.implicits._

val data = (1 to 100).toDS().map(i => UUID.randomUUID().toString).persist()

val left = data.map(k => (k, 1))
val right = data.map(k => (k, k)) // if i change this to k => (k, 1) it works!
println("number of left " + left.count())
println("number of right " + right.count())
println("number of (left join right) " +
  left.toDF("key", "vertex").join(right.toDF("key", "state"), "key").count()
)

val left1 = left
  .toDF("key", "vertex")
  .repartition(col("key")) // comment out this line to make it work
  .persist()
println("number of left1 " + left1.count())

val right1 = right
  .toDF("key", "state")
  .repartition(col("key")) // comment out this line to make it work
  .persist()
println("number of right1 " + right1.count())

println("number of (left1 join right1) " +  left1.join(right1, "key").count()) 
// this gives incorrect result{code}
this produces the following output:
{code:java}
number of left 100
number of right 100
number of (left join right) 100
number of left1 100
number of right1 100
number of (left1 join right1) 859531 {code}
note that the last number (the incorrect one) actually varies depending on 
settings and cluster size etc.

 


> Join loses records for cached datasets
> --
>
> Key: SPARK-45282
> URL: https://issues.apache.org/jira/browse/SPARK-45282
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.4.1, 3.5.0
> Environment: spark 3.4.1 on apache hadoop 3.3.6 or 

[jira] [Updated] (SPARK-45282) Join loses records for cached datasets

2023-09-22 Thread koert kuipers (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-45282?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

koert kuipers updated SPARK-45282:
--
Description: 
we observed this issue on spark 3.4.1 but it is also present on 3.5.0. it is 
not present on spark 3.3.1.

it only shows up in distributed environment. i cannot replicate in unit test. 
however i did get it to show up on hadoop cluster, kubernetes, and on 
databricks 13.3

the issue is that records are dropped when two cached dataframes are joined. it 
seems in spark 3.4.1 in queryplan some Exchanges are dropped as an optimization 
while in spark 3.3.1 these Exhanges are still present. it seems to be an issue 
with AQE with canChangeCachedPlanOutputPartitioning=true.

to reproduce on distributed cluster these settings needed are:
{code:java}
spark.sql.adaptive.advisoryPartitionSizeInBytes 33554432
spark.sql.adaptive.coalescePartitions.parallelismFirst false
spark.sql.adaptive.enabled true
spark.sql.optimizer.canChangeCachedPlanOutputPartitioning true {code}
code using scala 2.13 to reproduce is:
{code:java}
import java.util.UUID
import org.apache.spark.sql.functions.col

import spark.implicits._

val data = (1 to 100).toDS().map(i => UUID.randomUUID().toString).persist()

val left = data.map(k => (k, 1))
val right = data.map(k => (k, k)) // if i change this to k => (k, 1) it works!
println("number of left " + left.count())
println("number of right " + right.count())
println("number of (left join right) " +
  left.toDF("key", "vertex").join(right.toDF("key", "state"), "key").count()
)

val left1 = left
  .toDF("key", "vertex")
  .repartition(col("key")) // comment out this line to make it work
  .persist()
println("number of left1 " + left1.count())

val right1 = right
  .toDF("key", "state")
  .repartition(col("key")) // comment out this line to make it work
  .persist()
println("number of right1 " + right1.count())

println("number of (left1 join right1) " +  left1.join(right1, "key").count()) 
// this gives incorrect result{code}
this produces the following output:
{code:java}
number of left 100
number of right 100
number of (left join right) 100
number of left1 100
number of right1 100
number of (left1 join right1) 859531 {code}
note that the last number (the incorrect one) actually varies depending on 
settings and cluster size etc.

 

  was:
we observed this issue on spark 3.4.1 but it is also present on 3.5.0. it is 
not present on spark 3.3.1.

it only shows up in distributed environment. i cannot replicate in unit test. 
however i did get it to show up on hadoop cluster, kubernetes, and on 
databricks 13.3

the issue is that records are dropped when two cached dataframes are joined. it 
seems in spark 3.4.1 in queryplan some Exchanges are dropped as an optimization 
while in spark 3.3.1 these Exhanges are still present. it seems to be an issue 
with AQE with canChangeCachedPlanOutputPartitioning=true.

to reproduce on distributed cluster these settings needed are:
{code:java}
spark.sql.adaptive.advisoryPartitionSizeInBytes 33554432
spark.sql.adaptive.coalescePartitions.parallelismFirst false
spark.sql.adaptive.enabled true
spark.sql.optimizer.canChangeCachedPlanOutputPartitioning true {code}
code to reproduce is:
{code:java}
import java.util.UUID
import org.apache.spark.sql.functions.col

import spark.implicits._

val data = (1 to 100).toDS().map(i => UUID.randomUUID().toString).persist()

val left = data.map(k => (k, 1))
val right = data.map(k => (k, k)) // if i change this to k => (k, 1) it works!
println("number of left " + left.count())
println("number of right " + right.count())
println("number of (left join right) " +
  left.toDF("key", "vertex").join(right.toDF("key", "state"), "key").count()
)

val left1 = left
  .toDF("key", "vertex")
  .repartition(col("key")) // comment out this line to make it work
  .persist()
println("number of left1 " + left1.count())

val right1 = right
  .toDF("key", "state")
  .repartition(col("key")) // comment out this line to make it work
  .persist()
println("number of right1 " + right1.count())

println("number of (left1 join right1) " +  left1.join(right1, "key").count()) 
// this gives incorrect result{code}
this produces the following output:
{code:java}
number of left 100
number of right 100
number of (left join right) 100
number of left1 100
number of right1 100
number of (left1 join right1) 859531 {code}
note that the last number (the incorrect one) actually varies depending on 
settings and cluster size etc.

 


> Join loses records for cached datasets
> --
>
> Key: SPARK-45282
> URL: https://issues.apache.org/jira/browse/SPARK-45282
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.4.1, 3.5.0
> Environment: spark 3.4.1 on apache hadoop 3.3.6 or kubernetes 1

[jira] [Updated] (SPARK-45282) Join loses records for cached datasets

2023-09-22 Thread koert kuipers (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-45282?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

koert kuipers updated SPARK-45282:
--
Description: 
we observed this issue on spark 3.4.1 but it is also present on 3.5.0. it is 
not present on spark 3.3.1.

it only shows up in distributed environment. i cannot replicate in unit test. 
however i did get it to show up on hadoop cluster, kubernetes, and on 
databricks 13.3

the issue is that records are dropped when two cached dataframes are joined. it 
seems in spark 3.4.1 in queryplan some Exchanges are dropped as an optimization 
while in spark 3.3.1 these Exhanges are still present. it seems to be an issue 
with AQE with canChangeCachedPlanOutputPartitioning=true.

to reproduce on distributed cluster these settings needed are:
{code:java}
spark.sql.adaptive.advisoryPartitionSizeInBytes 33554432
spark.sql.adaptive.coalescePartitions.parallelismFirst false
spark.sql.adaptive.enabled true
spark.sql.optimizer.canChangeCachedPlanOutputPartitioning true {code}
code to reproduce is:
{code:java}
import java.util.UUID
import org.apache.spark.sql.functions.col

import spark.implicits._

val data = (1 to 100).toDS().map(i => UUID.randomUUID().toString).persist()

val left = data.map(k => (k, 1))
val right = data.map(k => (k, k)) // if i change this to k => (k, 1) it works!
println("number of left " + left.count())
println("number of right " + right.count())
println("number of (left join right) " +
  left.toDF("key", "vertex").join(right.toDF("key", "state"), "key").count()
)

val left1 = left
  .toDF("key", "vertex")
  .repartition(col("key")) // comment out this line to make it work
  .persist()
println("number of left1 " + left1.count())

val right1 = right
  .toDF("key", "state")
  .repartition(col("key")) // comment out this line to make it work
  .persist()
println("number of right1 " + right1.count())

println("number of (left1 join right1) " +  left1.join(right1, "key").count()) 
// this gives incorrect result{code}
this produces the following output:
{code:java}
number of left 100
number of right 100
number of (left join right) 100
number of left1 100
number of right1 100
number of (left1 join right1) 859531 {code}
note that the last number (the incorrect one) actually varies depending on 
settings and cluster size etc.

 

  was:
we observed this issue on spark 3.4.1 but it is also present on 3.5.0. it is 
not present on spark 3.3.1.

it only shows up in distributed environment. i cannot replicate in unit test. 
however i did get it to show up on hadoop cluster, kubernetes, and on 
databricks 13.3

the issue is that records are dropped when two cached dataframes are joined. it 
seems in spark 3.4.1 in queryplan some Exchanges are dropped as an optimization 
while in spark 3.3.1 these Exhanges are still present. it seems to be an issue 
with AQE with canChangeCachedPlanOutputPartitioning=true.

to reproduce on distributed cluster these settings needed are:
{code:java}
spark.sql.adaptive.advisoryPartitionSizeInBytes 33554432
spark.sql.adaptive.coalescePartitions.parallelismFirst false
spark.sql.adaptive.enabled true
spark.sql.optimizer.canChangeCachedPlanOutputPartitioning true {code}
code to reproduce is:
{code:java}
import java.util.UUID
import org.apache.spark.sql.functions.col

import spark.implicits._

val data = (1 to 100).toDS().map(i => UUID.randomUUID().toString).persist()

val left = data.map(k => (k, 1))
val right = data.map(k => (k, k)) // if i change this to k => (k, 1) it works!
println("number of left " + left.count())
println("number of right " + right.count())
println("number of (left join right) " +
  left.toDF("key", "vertex").join(right.toDF("key", "state"), "key").count()
)

val left1 = left
  .toDF("key", "vertex")
  .repartition(col("key")) // comment out this line to make it work
  .persist()
println("number of left1 " + left1.count())

val right1 = right
  .toDF("key", "state")
  .repartition(col("key")) // comment out this line to make it work
  .persist()
println("number of right1 " + right1.count())

println("number of (left1 join right1) " +  left1.join(right1, "key").count()) 
// this gives incorrect result{code}
 

 


> Join loses records for cached datasets
> --
>
> Key: SPARK-45282
> URL: https://issues.apache.org/jira/browse/SPARK-45282
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.4.1, 3.5.0
> Environment: spark 3.4.1 on apache hadoop 3.3.6 or kubernetes 1.26 or 
> databricks 13.3
>Reporter: koert kuipers
>Priority: Major
>  Labels: CorrectnessBug, correctness
>
> we observed this issue on spark 3.4.1 but it is also present on 3.5.0. it is 
> not present on spark 3.3.1.
> it only shows up in distributed environment. i cannot replicate in unit test. 
> however 

[jira] [Updated] (SPARK-45282) Join loses records for cached datasets

2023-09-22 Thread koert kuipers (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-45282?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

koert kuipers updated SPARK-45282:
--
Description: 
we observed this issue on spark 3.4.1 but it is also present on 3.5.0. it is 
not present on spark 3.3.1.

it only shows up in distributed environment. i cannot replicate in unit test. 
however i did get it to show up on hadoop cluster, kubernetes, and on 
databricks 13.3

the issue is that records are dropped when two cached dataframes are joined. it 
seems in spark 3.4.1 in queryplan some Exchanges are dropped as an optimization 
while in spark 3.3.1 these Exhanges are still present. it seems to be an issue 
with AQE with canChangeCachedPlanOutputPartitioning=true.

to reproduce on distributed cluster these settings needed are:
{code:java}
spark.sql.adaptive.advisoryPartitionSizeInBytes 33554432
spark.sql.adaptive.coalescePartitions.parallelismFirst false
spark.sql.adaptive.enabled true
spark.sql.optimizer.canChangeCachedPlanOutputPartitioning true {code}
code to reproduce is:
{code:java}
import java.util.UUID
import org.apache.spark.sql.functions.col

import spark.implicits._

val data = (1 to 100).toDS().map(i => UUID.randomUUID().toString).persist()

val left = data.map(k => (k, 1))
val right = data.map(k => (k, k)) // if i change this to k => (k, 1) it works!
println("number of left " + left.count())
println("number of right " + right.count())
println("number of (left join right) " +
  left.toDF("key", "vertex").join(right.toDF("key", "state"), "key").count()
)

val left1 = left
  .toDF("key", "vertex")
  .repartition(col("key")) // comment out this line to make it work
  .persist()
println("number of left1 " + left1.count())

val right1 = right
  .toDF("key", "state")
  .repartition(col("key")) // comment out this line to make it work
  .persist()
println("number of right1 " + right1.count())

println("number of (left1 join right1) " +  left1.join(right1, "key").count()) 
// this gives incorrect result{code}
 

 

  was:
we observed this issue on spark 3.4.1 but it is also present on 3.5.0. it is 
not present on spark 3.3.1.

it only shows up in distributed environment. i cannot replicate in unit test. 
however i did get it to show up on hadoop cluster, kubernetes, and on 
databricks 13.3

the issue is that records are dropped when two cached dataframes are joined. it 
seems in spark 3.4.1 in queryplan some Exchanges are dropped as an optimization 
while in spark 3.3.1 these Exhanges are still present. it seems to be an issue 
with AQE with canChangeCachedPlanOutputPartitioning=true.

to reproduce on distributed cluster these settings needed are:

 
{code:java}
spark.sql.adaptive.advisoryPartitionSizeInBytes 33554432
spark.sql.adaptive.coalescePartitions.parallelismFirst false
spark.sql.adaptive.enabled true
spark.sql.optimizer.canChangeCachedPlanOutputPartitioning true {code}
code to reproduce is:

 

 
{code:java}
import java.util.UUID
import org.apache.spark.sql.functions.col

import spark.implicits._

val data = (1 to 100).toDS().map(i => UUID.randomUUID().toString).persist()

val left = data.map(k => (k, 1))
val right = data.map(k => (k, k)) // if i change this to k => (k, 1) it works!
println("number of left " + left.count())
println("number of right " + right.count())
println("number of (left join right) " +
  left.toDF("key", "vertex").join(right.toDF("key", "state"), "key").count()
)

val left1 = left
  .toDF("key", "vertex")
  .repartition(col("key")) // comment out this line to make it work
  .persist()
println("number of left1 " + left1.count())

val right1 = right
  .toDF("key", "state")
  .repartition(col("key")) // comment out this line to make it work
  .persist()
println("number of right1 " + right1.count())

println("number of (left1 join right1) " +  left1.join(right1, "key").count()) 
// this gives incorrect result{code}
 

 


> Join loses records for cached datasets
> --
>
> Key: SPARK-45282
> URL: https://issues.apache.org/jira/browse/SPARK-45282
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.4.1, 3.5.0
> Environment: spark 3.4.1 on apache hadoop 3.3.6 or kubernetes 1.26 or 
> databricks 13.3
>Reporter: koert kuipers
>Priority: Major
>  Labels: CorrectnessBug, correctness
>
> we observed this issue on spark 3.4.1 but it is also present on 3.5.0. it is 
> not present on spark 3.3.1.
> it only shows up in distributed environment. i cannot replicate in unit test. 
> however i did get it to show up on hadoop cluster, kubernetes, and on 
> databricks 13.3
> the issue is that records are dropped when two cached dataframes are joined. 
> it seems in spark 3.4.1 in queryplan some Exchanges are dropped as an 
> optimization while in spark 3.3.1 these Exhanges are still present. it seems 
> to b