Michael created SPARK-32148:
-------------------------------

             Summary: LEFT JOIN generating non-deterministic and unexpected 
result (regression in Spark 3.0)
                 Key: SPARK-32148
                 URL: https://issues.apache.org/jira/browse/SPARK-32148
             Project: Spark
          Issue Type: Bug
          Components: SQL, Structured Streaming
    Affects Versions: 3.0.0
            Reporter: Michael


When upgrading from Spark 2.4.6 to 3.0.0 I found that previously working LEFT 
JOINs now output unexpected results.

Below is a minimal example to run in {{spark-shell}} to demonstrate this. In it 
there are 3 events on the left side of the join and two on the right.
 The expected output should contain two matching pairs and one item on the left 
side without a matching right side, so that it should be output with the right 
side fields being {{NULL}}.
{code:scala}
import spark.implicits._
import org.apache.spark.sql.Encoders
import org.apache.spark.sql.functions.expr
import org.apache.spark.sql.streaming.OutputMode
import java.sql.Timestamp
import java.util.UUID

// Structure of left and right data items
case class LeftEntry(eventTime: Timestamp, id: String, comment: String)
case class RightEntry(eventTime: Timestamp, id: String, name: String)

// Some test data
val leftData = Vector(
  LeftEntry(Timestamp.valueOf("2020-01-01 00:00:00"), "abc", "has no join 
partner"),
  LeftEntry(Timestamp.valueOf("2020-01-02 00:00:00"), "abc", "joined with A"),
  LeftEntry(Timestamp.valueOf("2020-01-02 01:00:00"), "abc", "joined with B")
)

val rightData = Vector(
  RightEntry(Timestamp.valueOf("2020-01-02 00:00:10"), "abc", "A"),
  RightEntry(Timestamp.valueOf("2020-01-02 00:59:59"), "abc", "B")
)

// Write test data, that we will stream from later (random output directories; 
alternatively we could delete the directories after each run)
val leftFilePath = s"/tmp/demo-left-data-${UUID.randomUUID()}"
spark.createDataset(leftData).write.format("parquet").save(leftFilePath)
val rightFilePath = s"/tmp/demo-right-data-${UUID.randomUUID()}"
spark.createDataset(rightData).write.format("parquet").save(rightFilePath)

// Read data from Parquet as stream
val leftStream = spark.readStream
  .schema(Encoders.product[LeftEntry].schema)
  .parquet(leftFilePath)
  .withWatermark("eventTime", "2 minutes")
val rightStream = spark.readStream
  .schema(Encoders.product[RightEntry].schema)
  .parquet(rightFilePath)
  .withWatermark("eventTime", "4 minutes")

// Define Join
val joinExpression = expr(
  s"""
     |leftStream.id = rightStream.id AND
     |leftStream.eventTime BETWEEN
     |  rightStream.eventTime - INTERVAL 30 seconds AND
     |  rightStream.eventTime + INTERVAL 30 seconds
    """.stripMargin
)
val joinedData = leftStream.as("leftStream")
  .join(
    rightStream.as("rightStream"),
    joinExpression,
    "left"
  )

// Run query
val query = joinedData.writeStream
  .format("memory")
  .queryName("myQuery")
  .outputMode(OutputMode.Append())
  .start()
query.processAllAvailable()

// Print results
spark
  .table(query.name)
  .show(truncate = false)
{code}
When this is executed with Spark 2.4.6, the result is as expected and 
deterministic:
{code:java}
+-------------------+---+-------------------+-------------------+----+----+
|eventTime          |id |comment            |eventTime          |id  |name|
+-------------------+---+-------------------+-------------------+----+----+
|2020-01-02 00:00:00|abc|joined with A      |2020-01-02 00:00:10|abc |A   |
|2020-01-02 01:00:00|abc|joined with B      |2020-01-02 00:59:59|abc |B   |
|2020-01-01 00:00:00|abc|has no join partner|null               |null|null|  ← 
as expected
+-------------------+---+-------------------+-------------------+----+----+
{code}
When running the same code snippet with Spark 3.0.0, the result is 
non-deterministically one of these two:
{code:java}
+-------------------+---+-------------+-------------------+----+----+
|eventTime          |id |comment      |eventTime          |id  |name|
+-------------------+---+-------------+-------------------+----+----+
|2020-01-02 01:00:00|abc|joined with B|2020-01-02 00:59:59|abc |B   |
|2020-01-02 00:00:00|abc|joined with A|2020-01-02 00:00:10|abc |A   |
|2020-01-02 00:00:00|abc|joined with A|null               |null|null|  ← this 
entry was already joined with "A" above,
+-------------------+---+-------------+-------------------+----+----+    but is 
now here once more without it's right join side
{code}
{code:java}
+-------------------+---+-------------+-------------------+----+----+
|eventTime          |id |comment      |eventTime          |id  |name|
+-------------------+---+-------------+-------------------+----+----+
|2020-01-02 00:00:00|abc|joined with A|2020-01-02 00:00:10|abc |A   |
|2020-01-02 01:00:00|abc|joined with B|2020-01-02 00:59:59|abc |B   |
|2020-01-02 01:00:00|abc|joined with B|null               |null|null|  ← this 
entry was already joined with "B" above,
+-------------------+---+-------------+-------------------+----+----+    but is 
now here once more without it's right join side
{code}
... with {{"has no join partner"}} completely missing, and instead one of the 
actually joinable left-side items repeated without the right-side fields.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to