Michael created SPARK-32148: ------------------------------- Summary: LEFT JOIN generating non-deterministic and unexpected result (regression in Spark 3.0) Key: SPARK-32148 URL: https://issues.apache.org/jira/browse/SPARK-32148 Project: Spark Issue Type: Bug Components: SQL, Structured Streaming Affects Versions: 3.0.0 Reporter: Michael
When upgrading from Spark 2.4.6 to 3.0.0 I found that previously working LEFT JOINs now output unexpected results. Below is a minimal example to run in {{spark-shell}} to demonstrate this. In it there are 3 events on the left side of the join and two on the right. The expected output should contain two matching pairs and one item on the left side without a matching right side, so that it should be output with the right side fields being {{NULL}}. {code:scala} import spark.implicits._ import org.apache.spark.sql.Encoders import org.apache.spark.sql.functions.expr import org.apache.spark.sql.streaming.OutputMode import java.sql.Timestamp import java.util.UUID // Structure of left and right data items case class LeftEntry(eventTime: Timestamp, id: String, comment: String) case class RightEntry(eventTime: Timestamp, id: String, name: String) // Some test data val leftData = Vector( LeftEntry(Timestamp.valueOf("2020-01-01 00:00:00"), "abc", "has no join partner"), LeftEntry(Timestamp.valueOf("2020-01-02 00:00:00"), "abc", "joined with A"), LeftEntry(Timestamp.valueOf("2020-01-02 01:00:00"), "abc", "joined with B") ) val rightData = Vector( RightEntry(Timestamp.valueOf("2020-01-02 00:00:10"), "abc", "A"), RightEntry(Timestamp.valueOf("2020-01-02 00:59:59"), "abc", "B") ) // Write test data, that we will stream from later (random output directories; alternatively we could delete the directories after each run) val leftFilePath = s"/tmp/demo-left-data-${UUID.randomUUID()}" spark.createDataset(leftData).write.format("parquet").save(leftFilePath) val rightFilePath = s"/tmp/demo-right-data-${UUID.randomUUID()}" spark.createDataset(rightData).write.format("parquet").save(rightFilePath) // Read data from Parquet as stream val leftStream = spark.readStream .schema(Encoders.product[LeftEntry].schema) .parquet(leftFilePath) .withWatermark("eventTime", "2 minutes") val rightStream = spark.readStream .schema(Encoders.product[RightEntry].schema) .parquet(rightFilePath) .withWatermark("eventTime", "4 minutes") // Define Join val joinExpression = expr( s""" |leftStream.id = rightStream.id AND |leftStream.eventTime BETWEEN | rightStream.eventTime - INTERVAL 30 seconds AND | rightStream.eventTime + INTERVAL 30 seconds """.stripMargin ) val joinedData = leftStream.as("leftStream") .join( rightStream.as("rightStream"), joinExpression, "left" ) // Run query val query = joinedData.writeStream .format("memory") .queryName("myQuery") .outputMode(OutputMode.Append()) .start() query.processAllAvailable() // Print results spark .table(query.name) .show(truncate = false) {code} When this is executed with Spark 2.4.6, the result is as expected and deterministic: {code:java} +-------------------+---+-------------------+-------------------+----+----+ |eventTime |id |comment |eventTime |id |name| +-------------------+---+-------------------+-------------------+----+----+ |2020-01-02 00:00:00|abc|joined with A |2020-01-02 00:00:10|abc |A | |2020-01-02 01:00:00|abc|joined with B |2020-01-02 00:59:59|abc |B | |2020-01-01 00:00:00|abc|has no join partner|null |null|null| ← as expected +-------------------+---+-------------------+-------------------+----+----+ {code} When running the same code snippet with Spark 3.0.0, the result is non-deterministically one of these two: {code:java} +-------------------+---+-------------+-------------------+----+----+ |eventTime |id |comment |eventTime |id |name| +-------------------+---+-------------+-------------------+----+----+ |2020-01-02 01:00:00|abc|joined with B|2020-01-02 00:59:59|abc |B | |2020-01-02 00:00:00|abc|joined with A|2020-01-02 00:00:10|abc |A | |2020-01-02 00:00:00|abc|joined with A|null |null|null| ← this entry was already joined with "A" above, +-------------------+---+-------------+-------------------+----+----+ but is now here once more without it's right join side {code} {code:java} +-------------------+---+-------------+-------------------+----+----+ |eventTime |id |comment |eventTime |id |name| +-------------------+---+-------------+-------------------+----+----+ |2020-01-02 00:00:00|abc|joined with A|2020-01-02 00:00:10|abc |A | |2020-01-02 01:00:00|abc|joined with B|2020-01-02 00:59:59|abc |B | |2020-01-02 01:00:00|abc|joined with B|null |null|null| ← this entry was already joined with "B" above, +-------------------+---+-------------+-------------------+----+----+ but is now here once more without it's right join side {code} ... with {{"has no join partner"}} completely missing, and instead one of the actually joinable left-side items repeated without the right-side fields. -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org