[ 
https://issues.apache.org/jira/browse/SPARK-32148?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-32148:
------------------------------------

    Assignee: Apache Spark

> LEFT JOIN generating non-deterministic and unexpected result (regression in 
> Spark 3.0)
> --------------------------------------------------------------------------------------
>
>                 Key: SPARK-32148
>                 URL: https://issues.apache.org/jira/browse/SPARK-32148
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL, Structured Streaming
>    Affects Versions: 3.0.0
>            Reporter: Michael
>            Assignee: Apache Spark
>            Priority: Blocker
>
> When upgrading from Spark 2.4.6 to 3.0.0 I found that previously working LEFT 
> JOINs now output unexpected results.
> Below is a minimal example to run in {{spark-shell}} to demonstrate this. In 
> it there are 3 events on the left side of the join and two on the right.
>  The expected output should contain two matching pairs and one item on the 
> left side without a matching right side, so that it should be output with the 
> right side fields being {{NULL}}. The join condition is that event times must 
> be max. 30sec apart and the IDs must match.
> {code:scala}
> import spark.implicits._
> import org.apache.spark.sql.Encoders
> import org.apache.spark.sql.functions.expr
> import org.apache.spark.sql.streaming.OutputMode
> import java.sql.Timestamp
> import java.util.UUID
> // Structure of left and right data items
> case class LeftEntry(eventTime: Timestamp, id: String, comment: String)
> case class RightEntry(eventTime: Timestamp, id: String, name: String)
> // Some test data
> val leftData = Vector(
>   LeftEntry(Timestamp.valueOf("2020-01-01 00:00:00"), "abc", "has no join 
> partner"),
>   LeftEntry(Timestamp.valueOf("2020-01-02 00:00:00"), "abc", "joined with A"),
>   LeftEntry(Timestamp.valueOf("2020-01-02 01:00:00"), "abc", "joined with B")
> )
> val rightData = Vector(
>   RightEntry(Timestamp.valueOf("2020-01-02 00:00:10"), "abc", "A"),
>   RightEntry(Timestamp.valueOf("2020-01-02 00:59:59"), "abc", "B")
> )
> // Write test data, that we will stream from later (random output 
> directories; alternatively we could delete the directories after each run)
> val leftFilePath = s"/tmp/demo-left-data-${UUID.randomUUID()}"
> spark.createDataset(leftData).write.format("parquet").save(leftFilePath)
> val rightFilePath = s"/tmp/demo-right-data-${UUID.randomUUID()}"
> spark.createDataset(rightData).write.format("parquet").save(rightFilePath)
> // Read data from Parquet as stream
> val leftStream = spark.readStream
>   .schema(Encoders.product[LeftEntry].schema)
>   .parquet(leftFilePath)
>   .withWatermark("eventTime", "2 minutes")
> val rightStream = spark.readStream
>   .schema(Encoders.product[RightEntry].schema)
>   .parquet(rightFilePath)
>   .withWatermark("eventTime", "4 minutes")
> // Define Join
> val joinExpression = expr(
>   s"""
>      |leftStream.id = rightStream.id AND
>      |leftStream.eventTime BETWEEN
>      |  rightStream.eventTime - INTERVAL 30 seconds AND
>      |  rightStream.eventTime + INTERVAL 30 seconds
>     """.stripMargin
> )
> val joinedData = leftStream.as("leftStream")
>   .join(
>     rightStream.as("rightStream"),
>     joinExpression,
>     "left"
>   )
> // Run query
> val query = joinedData.writeStream
>   .format("memory")
>   .queryName("myQuery")
>   .outputMode(OutputMode.Append())
>   .start()
> query.processAllAvailable()
> // Print results
> spark
>   .table(query.name)
>   .show(truncate = false)
> {code}
> When this is executed with Spark 2.4.6, the result is as expected and 
> deterministic:
> {noformat}
> +-------------------+---+-------------------+-------------------+----+----+
> |eventTime          |id |comment            |eventTime          |id  |name|
> +-------------------+---+-------------------+-------------------+----+----+
> |2020-01-02 00:00:00|abc|joined with A      |2020-01-02 00:00:10|abc |A   |
> |2020-01-02 01:00:00|abc|joined with B      |2020-01-02 00:59:59|abc |B   |
> |2020-01-01 00:00:00|abc|has no join partner|null               |null|null|  
> ← as expected
> +-------------------+---+-------------------+-------------------+----+----+
> {noformat}
> When running the same code snippet with Spark 3.0.0, the result is 
> non-deterministically one of these two:
> {noformat}
> +-------------------+---+-------------+-------------------+----+----+
> |eventTime          |id |comment      |eventTime          |id  |name|
> +-------------------+---+-------------+-------------------+----+----+
> |2020-01-02 01:00:00|abc|joined with B|2020-01-02 00:59:59|abc |B   |
> |2020-01-02 00:00:00|abc|joined with A|2020-01-02 00:00:10|abc |A   |
> |2020-01-02 00:00:00|abc|joined with A|null               |null|null|  ← this 
> entry was already joined with "A" above,
> +-------------------+---+-------------+-------------------+----+----+    but 
> is now here once more without it's right join side
> {noformat}
> {noformat}
> +-------------------+---+-------------+-------------------+----+----+
> |eventTime          |id |comment      |eventTime          |id  |name|
> +-------------------+---+-------------+-------------------+----+----+
> |2020-01-02 00:00:00|abc|joined with A|2020-01-02 00:00:10|abc |A   |
> |2020-01-02 01:00:00|abc|joined with B|2020-01-02 00:59:59|abc |B   |
> |2020-01-02 01:00:00|abc|joined with B|null               |null|null|  ← this 
> entry was already joined with "B" above,
> +-------------------+---+-------------+-------------------+----+----+    but 
> is now here once more without it's right join side
> {noformat}
> ... with {{"has no join partner"}} completely missing, and instead one of the 
> actually joinable left-side items repeated without the right-side fields.
> ----
> In case the input data is modified, so that the non-joinable event 
> additionally has a different ID, then Spark 3.0 generates correct output:
> {code:scala}
> // [...]
> val leftData = Vector(
>   LeftEntry(Timestamp.valueOf("2020-01-01 00:00:00"), "ddd", "has no join 
> partner"),
>                                                     // ↑↑↑ changed
>   LeftEntry(Timestamp.valueOf("2020-01-02 00:00:00"), "abc", "joined with A"),
>   LeftEntry(Timestamp.valueOf("2020-01-02 01:00:00"), "abc", "joined with B")
> )
> // [...]
> {code}
> {noformat}
> +-------------------+---+-------------------+-------------------+----+----+
> |eventTime          |id |comment            |eventTime          |id  |name|
> +-------------------+---+-------------------+-------------------+----+----+
> |2020-01-02 00:00:00|abc|joined with A      |2020-01-02 00:00:10|abc |A   |
> |2020-01-02 01:00:00|abc|joined with B      |2020-01-02 00:59:59|abc |B   |
> |2020-01-01 00:00:00|ddd|has no join partner|null               |null|null|
> +-------------------+---+-------------------+-------------------+----+----+
> {noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to