[ https://issues.apache.org/jira/browse/SPARK-33259?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Dongjoon Hyun reassigned SPARK-33259: ------------------------------------- Assignee: L. C. Hsieh > Joining 3 streams results in incorrect output > --------------------------------------------- > > Key: SPARK-33259 > URL: https://issues.apache.org/jira/browse/SPARK-33259 > Project: Spark > Issue Type: Bug > Components: Structured Streaming > Affects Versions: 3.0.1 > Reporter: Michael > Assignee: L. C. Hsieh > Priority: Critical > Labels: correctness > > I encountered an issue with Structured Streaming when doing a ((A LEFT JOIN > B) INNER JOIN C) operation. Below you can see example code I [posted on > Stackoverflow|https://stackoverflow.com/questions/64503539/]... > I created a minimal example of "sessions", that have "start" and "end" events > and optionally some "metadata". > The script generates two outputs: {{sessionStartsWithMetadata}} result from > "start" events that are left-joined with the "metadata" events, based on > {{sessionId}}. A "left join" is used, since we like to get an output event > even when no corresponding metadata exists. > Additionally a DataFrame {{endedSessionsWithMetadata}} is created by joining > "end" events to the previously created DataFrame. Here an "inner join" is > used, since we only want some output when a session has ended for sure. > This code can be executed in {{spark-shell}}: > {code:scala} > import java.sql.Timestamp > import org.apache.spark.sql.execution.streaming.{MemoryStream, > StreamingQueryWrapper} > import org.apache.spark.sql.streaming.StreamingQuery > import org.apache.spark.sql.{DataFrame, SQLContext} > import org.apache.spark.sql.functions.{col, expr, lit} > import spark.implicits._ > implicit val sqlContext: SQLContext = spark.sqlContext > // Main data processing, regardless whether batch or stream processing > def process( > sessionStartEvents: DataFrame, > sessionOptionalMetadataEvents: DataFrame, > sessionEndEvents: DataFrame > ): (DataFrame, DataFrame) = { > val sessionStartsWithMetadata: DataFrame = sessionStartEvents > .join( > sessionOptionalMetadataEvents, > sessionStartEvents("sessionId") === > sessionOptionalMetadataEvents("sessionId") && > sessionStartEvents("sessionStartTimestamp").between( > > sessionOptionalMetadataEvents("sessionOptionalMetadataTimestamp").minus(expr(s"INTERVAL > 1 seconds")), > > sessionOptionalMetadataEvents("sessionOptionalMetadataTimestamp").plus(expr(s"INTERVAL > 1 seconds")) > ), > "left" // metadata is optional > ) > .select( > sessionStartEvents("sessionId"), > sessionStartEvents("sessionStartTimestamp"), > sessionOptionalMetadataEvents("sessionOptionalMetadataTimestamp") > ) > val endedSessionsWithMetadata = sessionStartsWithMetadata.join( > sessionEndEvents, > sessionStartsWithMetadata("sessionId") === sessionEndEvents("sessionId") > && > sessionStartsWithMetadata("sessionStartTimestamp").between( > sessionEndEvents("sessionEndTimestamp").minus(expr(s"INTERVAL 10 > seconds")), > sessionEndEvents("sessionEndTimestamp") > ) > ) > (sessionStartsWithMetadata, endedSessionsWithMetadata) > } > def streamProcessing( > sessionStartData: Seq[(Timestamp, Int)], > sessionOptionalMetadata: Seq[(Timestamp, Int)], > sessionEndData: Seq[(Timestamp, Int)] > ): (StreamingQuery, StreamingQuery) = { > val sessionStartEventsStream: MemoryStream[(Timestamp, Int)] = > MemoryStream[(Timestamp, Int)] > sessionStartEventsStream.addData(sessionStartData) > val sessionStartEvents: DataFrame = sessionStartEventsStream > .toDS() > .toDF("sessionStartTimestamp", "sessionId") > .withWatermark("sessionStartTimestamp", "1 second") > val sessionOptionalMetadataEventsStream: MemoryStream[(Timestamp, Int)] = > MemoryStream[(Timestamp, Int)] > sessionOptionalMetadataEventsStream.addData(sessionOptionalMetadata) > val sessionOptionalMetadataEvents: DataFrame = > sessionOptionalMetadataEventsStream > .toDS() > .toDF("sessionOptionalMetadataTimestamp", "sessionId") > .withWatermark("sessionOptionalMetadataTimestamp", "1 second") > val sessionEndEventsStream: MemoryStream[(Timestamp, Int)] = > MemoryStream[(Timestamp, Int)] > sessionEndEventsStream.addData(sessionEndData) > val sessionEndEvents: DataFrame = sessionEndEventsStream > .toDS() > .toDF("sessionEndTimestamp", "sessionId") > .withWatermark("sessionEndTimestamp", "1 second") > val (sessionStartsWithMetadata, endedSessionsWithMetadata) = > process(sessionStartEvents, sessionOptionalMetadataEvents, > sessionEndEvents) > val sessionStartsWithMetadataQuery = sessionStartsWithMetadata > .select(lit("sessionStartsWithMetadata"), col("*")) // Add label to see > which query's output it is > .writeStream > .outputMode("append") > .format("console") > .option("truncate", "false") > .option("numRows", "1000") > .start() > val endedSessionsWithMetadataQuery = endedSessionsWithMetadata > .select(lit("endedSessionsWithMetadata"), col("*")) // Add label to see > which query's output it is > .writeStream > .outputMode("append") > .format("console") > .option("truncate", "false") > .option("numRows", "1000") > .start() > (sessionStartsWithMetadataQuery, endedSessionsWithMetadataQuery) > } > def batchProcessing( > sessionStartData: Seq[(Timestamp, Int)], > sessionOptionalMetadata: Seq[(Timestamp, Int)], > sessionEndData: Seq[(Timestamp, Int)] > ): Unit = { > val sessionStartEvents = > spark.createDataset(sessionStartData).toDF("sessionStartTimestamp", > "sessionId") > val sessionOptionalMetadataEvents = > spark.createDataset(sessionOptionalMetadata).toDF("sessionOptionalMetadataTimestamp", > "sessionId") > val sessionEndEvents = > spark.createDataset(sessionEndData).toDF("sessionEndTimestamp", "sessionId") > val (sessionStartsWithMetadata, endedSessionsWithMetadata) = > process(sessionStartEvents, sessionOptionalMetadataEvents, > sessionEndEvents) > println("sessionStartsWithMetadata") > sessionStartsWithMetadata.show(100, truncate = false) > println("endedSessionsWithMetadata") > endedSessionsWithMetadata.show(100, truncate = false) > } > // Data is represented as tuples of (eventTime, sessionId)... > val sessionStartData = Vector( > (new Timestamp(1), 0), > (new Timestamp(2000), 1), > (new Timestamp(2000), 2), > (new Timestamp(20000), 10) > ) > val sessionOptionalMetadata = Vector( > (new Timestamp(1), 0), > // session `1` has no metadata > (new Timestamp(2000), 2), > (new Timestamp(20000), 10) > ) > val sessionEndData = Vector( > (new Timestamp(10000), 0), > (new Timestamp(11000), 1), > (new Timestamp(12000), 2), > (new Timestamp(30000), 10) > ) > batchProcessing(sessionStartData, sessionOptionalMetadata, sessionEndData) > val (sessionStartsWithMetadataQuery, endedSessionsWithMetadataQuery) = > streamProcessing(sessionStartData, sessionOptionalMetadata, sessionEndData) > {code} > In the example session with ID {{1}} has no metadata, so the respective > metadata column is {{null}}. > The main functionality of joining the data is implemented in {{def > process(…)}}, which is called using both batch data and stream data. > In the batch version the output is as expected: > {noformat} > sessionStartsWithMetadata > +---------+-----------------------+--------------------------------+ > |sessionId|sessionStartTimestamp |sessionOptionalMetadataTimestamp| > +---------+-----------------------+--------------------------------+ > |0 |1970-01-01 01:00:00.001|1970-01-01 01:00:00.001 | > |1 |1970-01-01 01:00:02 |null | ← has no > metadata ✔ > |2 |1970-01-01 01:00:02 |1970-01-01 01:00:02 | > |10 |1970-01-01 01:00:20 |1970-01-01 01:00:20 | > +---------+-----------------------+--------------------------------+ > endedSessionsWithMetadata > +---------+-----------------------+--------------------------------+-------------------+---------+ > |sessionId|sessionStartTimestamp > |sessionOptionalMetadataTimestamp|sessionEndTimestamp|sessionId| > +---------+-----------------------+--------------------------------+-------------------+---------+ > |0 |1970-01-01 01:00:00.001|1970-01-01 01:00:00.001 > |1970-01-01 01:00:10|0 | > |1 |1970-01-01 01:00:02 |null > |1970-01-01 01:00:11|1 | ← has no metadata ✔ > |2 |1970-01-01 01:00:02 |1970-01-01 01:00:02 > |1970-01-01 01:00:12|2 | > |10 |1970-01-01 01:00:20 |1970-01-01 01:00:20 > |1970-01-01 01:00:30|10 | > +---------+-----------------------+--------------------------------+-------------------+---------+ > {noformat} > But when the same processing is run as stream processing the output of > {{endedSessionsWithMetadata}} does not contain the entry of session {{1}} > that has no metadata: > {noformat} > ------------------------------------------- > Batch: 0 ("start event") > ------------------------------------------- > +-------------------------+---------+-----------------------+--------------------------------+ > |sessionStartsWithMetadata|sessionId|sessionStartTimestamp > |sessionOptionalMetadataTimestamp| > +-------------------------+---------+-----------------------+--------------------------------+ > |sessionStartsWithMetadata|10 |1970-01-01 01:00:20 |1970-01-01 > 01:00:20 | > |sessionStartsWithMetadata|2 |1970-01-01 01:00:02 |1970-01-01 > 01:00:02 | > |sessionStartsWithMetadata|0 |1970-01-01 01:00:00.001|1970-01-01 > 01:00:00.001 | > +-------------------------+---------+-----------------------+--------------------------------+ > ------------------------------------------- > Batch: 0 ("end event") > ------------------------------------------- > +-------------------------+---------+-----------------------+--------------------------------+-------------------+---------+ > |endedSessionsWithMetadata|sessionId|sessionStartTimestamp > |sessionOptionalMetadataTimestamp|sessionEndTimestamp|sessionId| > +-------------------------+---------+-----------------------+--------------------------------+-------------------+---------+ > |endedSessionsWithMetadata|10 |1970-01-01 01:00:20 |1970-01-01 > 01:00:20 |1970-01-01 01:00:30|10 | > |endedSessionsWithMetadata|2 |1970-01-01 01:00:02 |1970-01-01 > 01:00:02 |1970-01-01 01:00:12|2 | > |endedSessionsWithMetadata|0 |1970-01-01 01:00:00.001|1970-01-01 > 01:00:00.001 |1970-01-01 01:00:10|0 | > +-------------------------+---------+-----------------------+--------------------------------+-------------------+---------+ > ------------------------------------------- > Batch: 1 ("start event") > ------------------------------------------- > +-------------------------+---------+---------------------+--------------------------------+ > |sessionStartsWithMetadata|sessionId|sessionStartTimestamp|sessionOptionalMetadataTimestamp| > +-------------------------+---------+---------------------+--------------------------------+ > |sessionStartsWithMetadata|1 |1970-01-01 01:00:02 |null > | ← has no metadata ✔ > +-------------------------+---------+---------------------+--------------------------------+ > ------------------------------------------- > Batch: 1 ("end event") > ------------------------------------------- > +-------------------------+---------+---------------------+--------------------------------+-------------------+---------+ > |endedSessionsWithMetadata|sessionId|sessionStartTimestamp|sessionOptionalMetadataTimestamp|sessionEndTimestamp|sessionId| > +-------------------------+---------+---------------------+--------------------------------+-------------------+---------+ > +-------------------------+---------+---------------------+--------------------------------+-------------------+---------+ > ↳ ✘ here I would have expected a line with sessionId=1, that has "start" > and "end" information, but no "metadata" ✘ > {noformat} > In a response it was suggested the issue looks related to [~kabhwan]'s > [mailing list > post|http://apache-spark-developers-list.1001551.n3.nabble.com/correctness-issue-on-chained-streaming-streaming-join-td27358.html], > but since I couldn't find a ticket here tracking the above mentioned issue, > I'm creating this one. -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org