[ https://issues.apache.org/jira/browse/SPARK-32148?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Apache Spark reassigned SPARK-32148: ------------------------------------ Assignee: Apache Spark > LEFT JOIN generating non-deterministic and unexpected result (regression in > Spark 3.0) > -------------------------------------------------------------------------------------- > > Key: SPARK-32148 > URL: https://issues.apache.org/jira/browse/SPARK-32148 > Project: Spark > Issue Type: Bug > Components: SQL, Structured Streaming > Affects Versions: 3.0.0 > Reporter: Michael > Assignee: Apache Spark > Priority: Blocker > > When upgrading from Spark 2.4.6 to 3.0.0 I found that previously working LEFT > JOINs now output unexpected results. > Below is a minimal example to run in {{spark-shell}} to demonstrate this. In > it there are 3 events on the left side of the join and two on the right. > The expected output should contain two matching pairs and one item on the > left side without a matching right side, so that it should be output with the > right side fields being {{NULL}}. The join condition is that event times must > be max. 30sec apart and the IDs must match. > {code:scala} > import spark.implicits._ > import org.apache.spark.sql.Encoders > import org.apache.spark.sql.functions.expr > import org.apache.spark.sql.streaming.OutputMode > import java.sql.Timestamp > import java.util.UUID > // Structure of left and right data items > case class LeftEntry(eventTime: Timestamp, id: String, comment: String) > case class RightEntry(eventTime: Timestamp, id: String, name: String) > // Some test data > val leftData = Vector( > LeftEntry(Timestamp.valueOf("2020-01-01 00:00:00"), "abc", "has no join > partner"), > LeftEntry(Timestamp.valueOf("2020-01-02 00:00:00"), "abc", "joined with A"), > LeftEntry(Timestamp.valueOf("2020-01-02 01:00:00"), "abc", "joined with B") > ) > val rightData = Vector( > RightEntry(Timestamp.valueOf("2020-01-02 00:00:10"), "abc", "A"), > RightEntry(Timestamp.valueOf("2020-01-02 00:59:59"), "abc", "B") > ) > // Write test data, that we will stream from later (random output > directories; alternatively we could delete the directories after each run) > val leftFilePath = s"/tmp/demo-left-data-${UUID.randomUUID()}" > spark.createDataset(leftData).write.format("parquet").save(leftFilePath) > val rightFilePath = s"/tmp/demo-right-data-${UUID.randomUUID()}" > spark.createDataset(rightData).write.format("parquet").save(rightFilePath) > // Read data from Parquet as stream > val leftStream = spark.readStream > .schema(Encoders.product[LeftEntry].schema) > .parquet(leftFilePath) > .withWatermark("eventTime", "2 minutes") > val rightStream = spark.readStream > .schema(Encoders.product[RightEntry].schema) > .parquet(rightFilePath) > .withWatermark("eventTime", "4 minutes") > // Define Join > val joinExpression = expr( > s""" > |leftStream.id = rightStream.id AND > |leftStream.eventTime BETWEEN > | rightStream.eventTime - INTERVAL 30 seconds AND > | rightStream.eventTime + INTERVAL 30 seconds > """.stripMargin > ) > val joinedData = leftStream.as("leftStream") > .join( > rightStream.as("rightStream"), > joinExpression, > "left" > ) > // Run query > val query = joinedData.writeStream > .format("memory") > .queryName("myQuery") > .outputMode(OutputMode.Append()) > .start() > query.processAllAvailable() > // Print results > spark > .table(query.name) > .show(truncate = false) > {code} > When this is executed with Spark 2.4.6, the result is as expected and > deterministic: > {noformat} > +-------------------+---+-------------------+-------------------+----+----+ > |eventTime |id |comment |eventTime |id |name| > +-------------------+---+-------------------+-------------------+----+----+ > |2020-01-02 00:00:00|abc|joined with A |2020-01-02 00:00:10|abc |A | > |2020-01-02 01:00:00|abc|joined with B |2020-01-02 00:59:59|abc |B | > |2020-01-01 00:00:00|abc|has no join partner|null |null|null| > ← as expected > +-------------------+---+-------------------+-------------------+----+----+ > {noformat} > When running the same code snippet with Spark 3.0.0, the result is > non-deterministically one of these two: > {noformat} > +-------------------+---+-------------+-------------------+----+----+ > |eventTime |id |comment |eventTime |id |name| > +-------------------+---+-------------+-------------------+----+----+ > |2020-01-02 01:00:00|abc|joined with B|2020-01-02 00:59:59|abc |B | > |2020-01-02 00:00:00|abc|joined with A|2020-01-02 00:00:10|abc |A | > |2020-01-02 00:00:00|abc|joined with A|null |null|null| ← this > entry was already joined with "A" above, > +-------------------+---+-------------+-------------------+----+----+ but > is now here once more without it's right join side > {noformat} > {noformat} > +-------------------+---+-------------+-------------------+----+----+ > |eventTime |id |comment |eventTime |id |name| > +-------------------+---+-------------+-------------------+----+----+ > |2020-01-02 00:00:00|abc|joined with A|2020-01-02 00:00:10|abc |A | > |2020-01-02 01:00:00|abc|joined with B|2020-01-02 00:59:59|abc |B | > |2020-01-02 01:00:00|abc|joined with B|null |null|null| ← this > entry was already joined with "B" above, > +-------------------+---+-------------+-------------------+----+----+ but > is now here once more without it's right join side > {noformat} > ... with {{"has no join partner"}} completely missing, and instead one of the > actually joinable left-side items repeated without the right-side fields. > ---- > In case the input data is modified, so that the non-joinable event > additionally has a different ID, then Spark 3.0 generates correct output: > {code:scala} > // [...] > val leftData = Vector( > LeftEntry(Timestamp.valueOf("2020-01-01 00:00:00"), "ddd", "has no join > partner"), > // ↑↑↑ changed > LeftEntry(Timestamp.valueOf("2020-01-02 00:00:00"), "abc", "joined with A"), > LeftEntry(Timestamp.valueOf("2020-01-02 01:00:00"), "abc", "joined with B") > ) > // [...] > {code} > {noformat} > +-------------------+---+-------------------+-------------------+----+----+ > |eventTime |id |comment |eventTime |id |name| > +-------------------+---+-------------------+-------------------+----+----+ > |2020-01-02 00:00:00|abc|joined with A |2020-01-02 00:00:10|abc |A | > |2020-01-02 01:00:00|abc|joined with B |2020-01-02 00:59:59|abc |B | > |2020-01-01 00:00:00|ddd|has no join partner|null |null|null| > +-------------------+---+-------------------+-------------------+----+----+ > {noformat} -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org