[ 
https://issues.apache.org/jira/browse/SPARK-32148?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael updated SPARK-32148:
----------------------------
    Description: 
When upgrading from Spark 2.4.6 to 3.0.0 I found that previously working LEFT 
JOINs now output unexpected results.

Below is a minimal example to run in {{spark-shell}} to demonstrate this. In it 
there are 3 events on the left side of the join and two on the right.
 The expected output should contain two matching pairs and one item on the left 
side without a matching right side, so that it should be output with the right 
side fields being {{NULL}}.
{code:scala}
import spark.implicits._
import org.apache.spark.sql.Encoders
import org.apache.spark.sql.functions.expr
import org.apache.spark.sql.streaming.OutputMode
import java.sql.Timestamp
import java.util.UUID

// Structure of left and right data items
case class LeftEntry(eventTime: Timestamp, id: String, comment: String)
case class RightEntry(eventTime: Timestamp, id: String, name: String)

// Some test data
val leftData = Vector(
  LeftEntry(Timestamp.valueOf("2020-01-01 00:00:00"), "abc", "has no join 
partner"),
  LeftEntry(Timestamp.valueOf("2020-01-02 00:00:00"), "abc", "joined with A"),
  LeftEntry(Timestamp.valueOf("2020-01-02 01:00:00"), "abc", "joined with B")
)

val rightData = Vector(
  RightEntry(Timestamp.valueOf("2020-01-02 00:00:10"), "abc", "A"),
  RightEntry(Timestamp.valueOf("2020-01-02 00:59:59"), "abc", "B")
)

// Write test data, that we will stream from later (random output directories; 
alternatively we could delete the directories after each run)
val leftFilePath = s"/tmp/demo-left-data-${UUID.randomUUID()}"
spark.createDataset(leftData).write.format("parquet").save(leftFilePath)
val rightFilePath = s"/tmp/demo-right-data-${UUID.randomUUID()}"
spark.createDataset(rightData).write.format("parquet").save(rightFilePath)

// Read data from Parquet as stream
val leftStream = spark.readStream
  .schema(Encoders.product[LeftEntry].schema)
  .parquet(leftFilePath)
  .withWatermark("eventTime", "2 minutes")
val rightStream = spark.readStream
  .schema(Encoders.product[RightEntry].schema)
  .parquet(rightFilePath)
  .withWatermark("eventTime", "4 minutes")

// Define Join
val joinExpression = expr(
  s"""
     |leftStream.id = rightStream.id AND
     |leftStream.eventTime BETWEEN
     |  rightStream.eventTime - INTERVAL 30 seconds AND
     |  rightStream.eventTime + INTERVAL 30 seconds
    """.stripMargin
)
val joinedData = leftStream.as("leftStream")
  .join(
    rightStream.as("rightStream"),
    joinExpression,
    "left"
  )

// Run query
val query = joinedData.writeStream
  .format("memory")
  .queryName("myQuery")
  .outputMode(OutputMode.Append())
  .start()
query.processAllAvailable()

// Print results
spark
  .table(query.name)
  .show(truncate = false)
{code}
When this is executed with Spark 2.4.6, the result is as expected and 
deterministic:
{noformat}
+-------------------+---+-------------------+-------------------+----+----+
|eventTime          |id |comment            |eventTime          |id  |name|
+-------------------+---+-------------------+-------------------+----+----+
|2020-01-02 00:00:00|abc|joined with A      |2020-01-02 00:00:10|abc |A   |
|2020-01-02 01:00:00|abc|joined with B      |2020-01-02 00:59:59|abc |B   |
|2020-01-01 00:00:00|abc|has no join partner|null               |null|null|  ← 
as expected
+-------------------+---+-------------------+-------------------+----+----+
{noformat}
When running the same code snippet with Spark 3.0.0, the result is 
non-deterministically one of these two:
{noformat}
+-------------------+---+-------------+-------------------+----+----+
|eventTime          |id |comment      |eventTime          |id  |name|
+-------------------+---+-------------+-------------------+----+----+
|2020-01-02 01:00:00|abc|joined with B|2020-01-02 00:59:59|abc |B   |
|2020-01-02 00:00:00|abc|joined with A|2020-01-02 00:00:10|abc |A   |
|2020-01-02 00:00:00|abc|joined with A|null               |null|null|  ← this 
entry was already joined with "A" above,
+-------------------+---+-------------+-------------------+----+----+    but is 
now here once more without it's right join side
{noformat}
{noformat}
+-------------------+---+-------------+-------------------+----+----+
|eventTime          |id |comment      |eventTime          |id  |name|
+-------------------+---+-------------+-------------------+----+----+
|2020-01-02 00:00:00|abc|joined with A|2020-01-02 00:00:10|abc |A   |
|2020-01-02 01:00:00|abc|joined with B|2020-01-02 00:59:59|abc |B   |
|2020-01-02 01:00:00|abc|joined with B|null               |null|null|  ← this 
entry was already joined with "B" above,
+-------------------+---+-------------+-------------------+----+----+    but is 
now here once more without it's right join side
{noformat}
... with {{"has no join partner"}} completely missing, and instead one of the 
actually joinable left-side items repeated without the right-side fields.

  was:
When upgrading from Spark 2.4.6 to 3.0.0 I found that previously working LEFT 
JOINs now output unexpected results.

Below is a minimal example to run in {{spark-shell}} to demonstrate this. In it 
there are 3 events on the left side of the join and two on the right.
 The expected output should contain two matching pairs and one item on the left 
side without a matching right side, so that it should be output with the right 
side fields being {{NULL}}.
{code:scala}
import spark.implicits._
import org.apache.spark.sql.Encoders
import org.apache.spark.sql.functions.expr
import org.apache.spark.sql.streaming.OutputMode
import java.sql.Timestamp
import java.util.UUID

// Structure of left and right data items
case class LeftEntry(eventTime: Timestamp, id: String, comment: String)
case class RightEntry(eventTime: Timestamp, id: String, name: String)

// Some test data
val leftData = Vector(
  LeftEntry(Timestamp.valueOf("2020-01-01 00:00:00"), "abc", "has no join 
partner"),
  LeftEntry(Timestamp.valueOf("2020-01-02 00:00:00"), "abc", "joined with A"),
  LeftEntry(Timestamp.valueOf("2020-01-02 01:00:00"), "abc", "joined with B")
)

val rightData = Vector(
  RightEntry(Timestamp.valueOf("2020-01-02 00:00:10"), "abc", "A"),
  RightEntry(Timestamp.valueOf("2020-01-02 00:59:59"), "abc", "B")
)

// Write test data, that we will stream from later (random output directories; 
alternatively we could delete the directories after each run)
val leftFilePath = s"/tmp/demo-left-data-${UUID.randomUUID()}"
spark.createDataset(leftData).write.format("parquet").save(leftFilePath)
val rightFilePath = s"/tmp/demo-right-data-${UUID.randomUUID()}"
spark.createDataset(rightData).write.format("parquet").save(rightFilePath)

// Read data from Parquet as stream
val leftStream = spark.readStream
  .schema(Encoders.product[LeftEntry].schema)
  .parquet(leftFilePath)
  .withWatermark("eventTime", "2 minutes")
val rightStream = spark.readStream
  .schema(Encoders.product[RightEntry].schema)
  .parquet(rightFilePath)
  .withWatermark("eventTime", "4 minutes")

// Define Join
val joinExpression = expr(
  s"""
     |leftStream.id = rightStream.id AND
     |leftStream.eventTime BETWEEN
     |  rightStream.eventTime - INTERVAL 30 seconds AND
     |  rightStream.eventTime + INTERVAL 30 seconds
    """.stripMargin
)
val joinedData = leftStream.as("leftStream")
  .join(
    rightStream.as("rightStream"),
    joinExpression,
    "left"
  )

// Run query
val query = joinedData.writeStream
  .format("memory")
  .queryName("myQuery")
  .outputMode(OutputMode.Append())
  .start()
query.processAllAvailable()

// Print results
spark
  .table(query.name)
  .show(truncate = false)
{code}
When this is executed with Spark 2.4.6, the result is as expected and 
deterministic:
{code:java}
+-------------------+---+-------------------+-------------------+----+----+
|eventTime          |id |comment            |eventTime          |id  |name|
+-------------------+---+-------------------+-------------------+----+----+
|2020-01-02 00:00:00|abc|joined with A      |2020-01-02 00:00:10|abc |A   |
|2020-01-02 01:00:00|abc|joined with B      |2020-01-02 00:59:59|abc |B   |
|2020-01-01 00:00:00|abc|has no join partner|null               |null|null|  ← 
as expected
+-------------------+---+-------------------+-------------------+----+----+
{code}
When running the same code snippet with Spark 3.0.0, the result is 
non-deterministically one of these two:
{code:java}
+-------------------+---+-------------+-------------------+----+----+
|eventTime          |id |comment      |eventTime          |id  |name|
+-------------------+---+-------------+-------------------+----+----+
|2020-01-02 01:00:00|abc|joined with B|2020-01-02 00:59:59|abc |B   |
|2020-01-02 00:00:00|abc|joined with A|2020-01-02 00:00:10|abc |A   |
|2020-01-02 00:00:00|abc|joined with A|null               |null|null|  ← this 
entry was already joined with "A" above,
+-------------------+---+-------------+-------------------+----+----+    but is 
now here once more without it's right join side
{code}
{code:java}
+-------------------+---+-------------+-------------------+----+----+
|eventTime          |id |comment      |eventTime          |id  |name|
+-------------------+---+-------------+-------------------+----+----+
|2020-01-02 00:00:00|abc|joined with A|2020-01-02 00:00:10|abc |A   |
|2020-01-02 01:00:00|abc|joined with B|2020-01-02 00:59:59|abc |B   |
|2020-01-02 01:00:00|abc|joined with B|null               |null|null|  ← this 
entry was already joined with "B" above,
+-------------------+---+-------------+-------------------+----+----+    but is 
now here once more without it's right join side
{code}
... with {{"has no join partner"}} completely missing, and instead one of the 
actually joinable left-side items repeated without the right-side fields.


> LEFT JOIN generating non-deterministic and unexpected result (regression in 
> Spark 3.0)
> --------------------------------------------------------------------------------------
>
>                 Key: SPARK-32148
>                 URL: https://issues.apache.org/jira/browse/SPARK-32148
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL, Structured Streaming
>    Affects Versions: 3.0.0
>            Reporter: Michael
>            Priority: Major
>
> When upgrading from Spark 2.4.6 to 3.0.0 I found that previously working LEFT 
> JOINs now output unexpected results.
> Below is a minimal example to run in {{spark-shell}} to demonstrate this. In 
> it there are 3 events on the left side of the join and two on the right.
>  The expected output should contain two matching pairs and one item on the 
> left side without a matching right side, so that it should be output with the 
> right side fields being {{NULL}}.
> {code:scala}
> import spark.implicits._
> import org.apache.spark.sql.Encoders
> import org.apache.spark.sql.functions.expr
> import org.apache.spark.sql.streaming.OutputMode
> import java.sql.Timestamp
> import java.util.UUID
> // Structure of left and right data items
> case class LeftEntry(eventTime: Timestamp, id: String, comment: String)
> case class RightEntry(eventTime: Timestamp, id: String, name: String)
> // Some test data
> val leftData = Vector(
>   LeftEntry(Timestamp.valueOf("2020-01-01 00:00:00"), "abc", "has no join 
> partner"),
>   LeftEntry(Timestamp.valueOf("2020-01-02 00:00:00"), "abc", "joined with A"),
>   LeftEntry(Timestamp.valueOf("2020-01-02 01:00:00"), "abc", "joined with B")
> )
> val rightData = Vector(
>   RightEntry(Timestamp.valueOf("2020-01-02 00:00:10"), "abc", "A"),
>   RightEntry(Timestamp.valueOf("2020-01-02 00:59:59"), "abc", "B")
> )
> // Write test data, that we will stream from later (random output 
> directories; alternatively we could delete the directories after each run)
> val leftFilePath = s"/tmp/demo-left-data-${UUID.randomUUID()}"
> spark.createDataset(leftData).write.format("parquet").save(leftFilePath)
> val rightFilePath = s"/tmp/demo-right-data-${UUID.randomUUID()}"
> spark.createDataset(rightData).write.format("parquet").save(rightFilePath)
> // Read data from Parquet as stream
> val leftStream = spark.readStream
>   .schema(Encoders.product[LeftEntry].schema)
>   .parquet(leftFilePath)
>   .withWatermark("eventTime", "2 minutes")
> val rightStream = spark.readStream
>   .schema(Encoders.product[RightEntry].schema)
>   .parquet(rightFilePath)
>   .withWatermark("eventTime", "4 minutes")
> // Define Join
> val joinExpression = expr(
>   s"""
>      |leftStream.id = rightStream.id AND
>      |leftStream.eventTime BETWEEN
>      |  rightStream.eventTime - INTERVAL 30 seconds AND
>      |  rightStream.eventTime + INTERVAL 30 seconds
>     """.stripMargin
> )
> val joinedData = leftStream.as("leftStream")
>   .join(
>     rightStream.as("rightStream"),
>     joinExpression,
>     "left"
>   )
> // Run query
> val query = joinedData.writeStream
>   .format("memory")
>   .queryName("myQuery")
>   .outputMode(OutputMode.Append())
>   .start()
> query.processAllAvailable()
> // Print results
> spark
>   .table(query.name)
>   .show(truncate = false)
> {code}
> When this is executed with Spark 2.4.6, the result is as expected and 
> deterministic:
> {noformat}
> +-------------------+---+-------------------+-------------------+----+----+
> |eventTime          |id |comment            |eventTime          |id  |name|
> +-------------------+---+-------------------+-------------------+----+----+
> |2020-01-02 00:00:00|abc|joined with A      |2020-01-02 00:00:10|abc |A   |
> |2020-01-02 01:00:00|abc|joined with B      |2020-01-02 00:59:59|abc |B   |
> |2020-01-01 00:00:00|abc|has no join partner|null               |null|null|  
> ← as expected
> +-------------------+---+-------------------+-------------------+----+----+
> {noformat}
> When running the same code snippet with Spark 3.0.0, the result is 
> non-deterministically one of these two:
> {noformat}
> +-------------------+---+-------------+-------------------+----+----+
> |eventTime          |id |comment      |eventTime          |id  |name|
> +-------------------+---+-------------+-------------------+----+----+
> |2020-01-02 01:00:00|abc|joined with B|2020-01-02 00:59:59|abc |B   |
> |2020-01-02 00:00:00|abc|joined with A|2020-01-02 00:00:10|abc |A   |
> |2020-01-02 00:00:00|abc|joined with A|null               |null|null|  ← this 
> entry was already joined with "A" above,
> +-------------------+---+-------------+-------------------+----+----+    but 
> is now here once more without it's right join side
> {noformat}
> {noformat}
> +-------------------+---+-------------+-------------------+----+----+
> |eventTime          |id |comment      |eventTime          |id  |name|
> +-------------------+---+-------------+-------------------+----+----+
> |2020-01-02 00:00:00|abc|joined with A|2020-01-02 00:00:10|abc |A   |
> |2020-01-02 01:00:00|abc|joined with B|2020-01-02 00:59:59|abc |B   |
> |2020-01-02 01:00:00|abc|joined with B|null               |null|null|  ← this 
> entry was already joined with "B" above,
> +-------------------+---+-------------+-------------------+----+----+    but 
> is now here once more without it's right join side
> {noformat}
> ... with {{"has no join partner"}} completely missing, and instead one of the 
> actually joinable left-side items repeated without the right-side fields.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to