[ https://issues.apache.org/jira/browse/SPARK-33259?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Michael updated SPARK-33259: ---------------------------- Description: I encountered an issue with Structured Streaming when doing a ((A LEFT JOIN B) JOIN C) operation. Below you can see example code I [posted on Stackoverflow|https://stackoverflow.com/questions/64503539/]... I created a minimal example of "sessions", that have "start" and "end" events and optionally some "metadata". The script generates two outputs: {{sessionStartsWithMetadata}} result from "start" events that are left-joined with the "metadata" events, based on {{sessionId}}. A "left join" is used, since we like to get an output event even when no corresponding metadata exists. Additionally a DataFrame {{endedSessionsWithMetadata}} is created by joining "end" events to the previously created DataFrame. Here an "inner join" is used, since we only want some output when a session has ended for sure. This code can be executed in {{spark-shell}}: {code:scala} import java.sql.Timestamp import org.apache.spark.sql.execution.streaming.{MemoryStream, StreamingQueryWrapper} import org.apache.spark.sql.streaming.StreamingQuery import org.apache.spark.sql.{DataFrame, SQLContext} import org.apache.spark.sql.functions.{col, expr, lit} import spark.implicits._ implicit val sqlContext: SQLContext = spark.sqlContext // Main data processing, regardless whether batch or stream processing def process( sessionStartEvents: DataFrame, sessionOptionalMetadataEvents: DataFrame, sessionEndEvents: DataFrame ): (DataFrame, DataFrame) = { val sessionStartsWithMetadata: DataFrame = sessionStartEvents .join( sessionOptionalMetadataEvents, sessionStartEvents("sessionId") === sessionOptionalMetadataEvents("sessionId") && sessionStartEvents("sessionStartTimestamp").between( sessionOptionalMetadataEvents("sessionOptionalMetadataTimestamp").minus(expr(s"INTERVAL 1 seconds")), sessionOptionalMetadataEvents("sessionOptionalMetadataTimestamp").plus(expr(s"INTERVAL 1 seconds")) ), "left" // metadata is optional ) .select( sessionStartEvents("sessionId"), sessionStartEvents("sessionStartTimestamp"), sessionOptionalMetadataEvents("sessionOptionalMetadataTimestamp") ) val endedSessionsWithMetadata = sessionStartsWithMetadata.join( sessionEndEvents, sessionStartsWithMetadata("sessionId") === sessionEndEvents("sessionId") && sessionStartsWithMetadata("sessionStartTimestamp").between( sessionEndEvents("sessionEndTimestamp").minus(expr(s"INTERVAL 10 seconds")), sessionEndEvents("sessionEndTimestamp") ) ) (sessionStartsWithMetadata, endedSessionsWithMetadata) } def streamProcessing( sessionStartData: Seq[(Timestamp, Int)], sessionOptionalMetadata: Seq[(Timestamp, Int)], sessionEndData: Seq[(Timestamp, Int)] ): (StreamingQuery, StreamingQuery) = { val sessionStartEventsStream: MemoryStream[(Timestamp, Int)] = MemoryStream[(Timestamp, Int)] sessionStartEventsStream.addData(sessionStartData) val sessionStartEvents: DataFrame = sessionStartEventsStream .toDS() .toDF("sessionStartTimestamp", "sessionId") .withWatermark("sessionStartTimestamp", "1 second") val sessionOptionalMetadataEventsStream: MemoryStream[(Timestamp, Int)] = MemoryStream[(Timestamp, Int)] sessionOptionalMetadataEventsStream.addData(sessionOptionalMetadata) val sessionOptionalMetadataEvents: DataFrame = sessionOptionalMetadataEventsStream .toDS() .toDF("sessionOptionalMetadataTimestamp", "sessionId") .withWatermark("sessionOptionalMetadataTimestamp", "1 second") val sessionEndEventsStream: MemoryStream[(Timestamp, Int)] = MemoryStream[(Timestamp, Int)] sessionEndEventsStream.addData(sessionEndData) val sessionEndEvents: DataFrame = sessionEndEventsStream .toDS() .toDF("sessionEndTimestamp", "sessionId") .withWatermark("sessionEndTimestamp", "1 second") val (sessionStartsWithMetadata, endedSessionsWithMetadata) = process(sessionStartEvents, sessionOptionalMetadataEvents, sessionEndEvents) val sessionStartsWithMetadataQuery = sessionStartsWithMetadata .select(lit("sessionStartsWithMetadata"), col("*")) // Add label to see which query's output it is .writeStream .outputMode("append") .format("console") .option("truncate", "false") .option("numRows", "1000") .start() val endedSessionsWithMetadataQuery = endedSessionsWithMetadata .select(lit("endedSessionsWithMetadata"), col("*")) // Add label to see which query's output it is .writeStream .outputMode("append") .format("console") .option("truncate", "false") .option("numRows", "1000") .start() (sessionStartsWithMetadataQuery, endedSessionsWithMetadataQuery) } def batchProcessing( sessionStartData: Seq[(Timestamp, Int)], sessionOptionalMetadata: Seq[(Timestamp, Int)], sessionEndData: Seq[(Timestamp, Int)] ): Unit = { val sessionStartEvents = spark.createDataset(sessionStartData).toDF("sessionStartTimestamp", "sessionId") val sessionOptionalMetadataEvents = spark.createDataset(sessionOptionalMetadata).toDF("sessionOptionalMetadataTimestamp", "sessionId") val sessionEndEvents = spark.createDataset(sessionEndData).toDF("sessionEndTimestamp", "sessionId") val (sessionStartsWithMetadata, endedSessionsWithMetadata) = process(sessionStartEvents, sessionOptionalMetadataEvents, sessionEndEvents) println("sessionStartsWithMetadata") sessionStartsWithMetadata.show(100, truncate = false) println("endedSessionsWithMetadata") endedSessionsWithMetadata.show(100, truncate = false) } // Data is represented as tuples of (eventTime, sessionId)... val sessionStartData = Vector( (new Timestamp(1), 0), (new Timestamp(2000), 1), (new Timestamp(2000), 2), (new Timestamp(20000), 10) ) val sessionOptionalMetadata = Vector( (new Timestamp(1), 0), // session `1` has no metadata (new Timestamp(2000), 2), (new Timestamp(20000), 10) ) val sessionEndData = Vector( (new Timestamp(10000), 0), (new Timestamp(11000), 1), (new Timestamp(12000), 2), (new Timestamp(30000), 10) ) batchProcessing(sessionStartData, sessionOptionalMetadata, sessionEndData) val (sessionStartsWithMetadataQuery, endedSessionsWithMetadataQuery) = streamProcessing(sessionStartData, sessionOptionalMetadata, sessionEndData) {code} In the example session with ID {{1}} has no metadata, so the respective metadata column is {{null}}. The main functionality of joining the data is implemented in {{def process(…)}}, which is called using both batch data and stream data. In the batch version the output is as expected: {noformat} sessionStartsWithMetadata +---------+-----------------------+--------------------------------+ |sessionId|sessionStartTimestamp |sessionOptionalMetadataTimestamp| +---------+-----------------------+--------------------------------+ |0 |1970-01-01 01:00:00.001|1970-01-01 01:00:00.001 | |1 |1970-01-01 01:00:02 |null | ← has no metadata ✔ |2 |1970-01-01 01:00:02 |1970-01-01 01:00:02 | |10 |1970-01-01 01:00:20 |1970-01-01 01:00:20 | +---------+-----------------------+--------------------------------+ endedSessionsWithMetadata +---------+-----------------------+--------------------------------+-------------------+---------+ |sessionId|sessionStartTimestamp |sessionOptionalMetadataTimestamp|sessionEndTimestamp|sessionId| +---------+-----------------------+--------------------------------+-------------------+---------+ |0 |1970-01-01 01:00:00.001|1970-01-01 01:00:00.001 |1970-01-01 01:00:10|0 | |1 |1970-01-01 01:00:02 |null |1970-01-01 01:00:11|1 | ← has no metadata ✔ |2 |1970-01-01 01:00:02 |1970-01-01 01:00:02 |1970-01-01 01:00:12|2 | |10 |1970-01-01 01:00:20 |1970-01-01 01:00:20 |1970-01-01 01:00:30|10 | +---------+-----------------------+--------------------------------+-------------------+---------+ {noformat} But when the same processing is run as stream processing the output of {{endedSessionsWithMetadata}} does not contain the entry of session {{1}} that has no metadata: {noformat} ------------------------------------------- Batch: 0 ("start event") ------------------------------------------- +-------------------------+---------+-----------------------+--------------------------------+ |sessionStartsWithMetadata|sessionId|sessionStartTimestamp |sessionOptionalMetadataTimestamp| +-------------------------+---------+-----------------------+--------------------------------+ |sessionStartsWithMetadata|10 |1970-01-01 01:00:20 |1970-01-01 01:00:20 | |sessionStartsWithMetadata|2 |1970-01-01 01:00:02 |1970-01-01 01:00:02 | |sessionStartsWithMetadata|0 |1970-01-01 01:00:00.001|1970-01-01 01:00:00.001 | +-------------------------+---------+-----------------------+--------------------------------+ ------------------------------------------- Batch: 0 ("end event") ------------------------------------------- +-------------------------+---------+-----------------------+--------------------------------+-------------------+---------+ |endedSessionsWithMetadata|sessionId|sessionStartTimestamp |sessionOptionalMetadataTimestamp|sessionEndTimestamp|sessionId| +-------------------------+---------+-----------------------+--------------------------------+-------------------+---------+ |endedSessionsWithMetadata|10 |1970-01-01 01:00:20 |1970-01-01 01:00:20 |1970-01-01 01:00:30|10 | |endedSessionsWithMetadata|2 |1970-01-01 01:00:02 |1970-01-01 01:00:02 |1970-01-01 01:00:12|2 | |endedSessionsWithMetadata|0 |1970-01-01 01:00:00.001|1970-01-01 01:00:00.001 |1970-01-01 01:00:10|0 | +-------------------------+---------+-----------------------+--------------------------------+-------------------+---------+ ------------------------------------------- Batch: 1 ("start event") ------------------------------------------- +-------------------------+---------+---------------------+--------------------------------+ |sessionStartsWithMetadata|sessionId|sessionStartTimestamp|sessionOptionalMetadataTimestamp| +-------------------------+---------+---------------------+--------------------------------+ |sessionStartsWithMetadata|1 |1970-01-01 01:00:02 |null | ← has no metadata ✔ +-------------------------+---------+---------------------+--------------------------------+ ------------------------------------------- Batch: 1 ("end event") ------------------------------------------- +-------------------------+---------+---------------------+--------------------------------+-------------------+---------+ |endedSessionsWithMetadata|sessionId|sessionStartTimestamp|sessionOptionalMetadataTimestamp|sessionEndTimestamp|sessionId| +-------------------------+---------+---------------------+--------------------------------+-------------------+---------+ +-------------------------+---------+---------------------+--------------------------------+-------------------+---------+ ↳ ✘ here I would have expected a line with sessionId=1, that has "start" and "end" information, but no "metadata" ✘ {noformat} In a response it was suggested the issue looks related to [~kabhwan]'s [mailing list post|http://apache-spark-developers-list.1001551.n3.nabble.com/correctness-issue-on-chained-streaming-streaming-join-td27358.html], but since I couldn't find a ticket here tracking the above mentioned issue, I'm creating this one. was: I encountered an issue with Structured Streaming when doing a ((A LEFT JOIN B) JOIN C) operation. Below you can see example code I [posted on Stackoverflow|https://stackoverflow.com/questions/64503539/]... I created a minimal example of "sessions", that have "start" and "end" events and optionally some "metadata". The script generates two outputs: {{sessionStartsWithMetadata}} result from "start" events that are left-joined with the "metadata" events, based on {{sessionId}}. A "left join" is used, since we like to get an output event even when no corresponding metadata exists. Additionally a DataFrame {{endedSessionsWithMetadata}} is created by joining "end" events to the previously created DataFrame. Here an "inner join" is used, since we only want some output when a session has ended for sure. This code can be executed in {{spark-shell}}: {code:scala} import java.sql.Timestamp import org.apache.spark.sql.execution.streaming.{MemoryStream, StreamingQueryWrapper} import org.apache.spark.sql.streaming.StreamingQuery import org.apache.spark.sql.{DataFrame, SQLContext} import org.apache.spark.sql.functions.{col, expr, lit} import spark.implicits._ implicit val sqlContext: SQLContext = spark.sqlContext // Main data processing, regardless whether batch or stream processing def process( sessionStartEvents: DataFrame, sessionOptionalMetadataEvents: DataFrame, sessionEndEvents: DataFrame ): (DataFrame, DataFrame) = { val sessionStartsWithMetadata: DataFrame = sessionStartEvents .join( sessionOptionalMetadataEvents, sessionStartEvents("sessionId") === sessionOptionalMetadataEvents("sessionId") && sessionStartEvents("sessionStartTimestamp").between( sessionOptionalMetadataEvents("sessionOptionalMetadataTimestamp").minus(expr(s"INTERVAL 1 seconds")), sessionOptionalMetadataEvents("sessionOptionalMetadataTimestamp").plus(expr(s"INTERVAL 1 seconds")) ), "left" // metadata is optional ) .select( sessionStartEvents("sessionId"), sessionStartEvents("sessionStartTimestamp"), sessionOptionalMetadataEvents("sessionOptionalMetadataTimestamp") ) val endedSessionsWithMetadata = sessionStartsWithMetadata.join( sessionEndEvents, sessionStartsWithMetadata("sessionId") === sessionEndEvents("sessionId") && sessionStartsWithMetadata("sessionStartTimestamp").between( sessionEndEvents("sessionEndTimestamp").minus(expr(s"INTERVAL 10 seconds")), sessionEndEvents("sessionEndTimestamp") ) ) (sessionStartsWithMetadata, endedSessionsWithMetadata) } def streamProcessing( sessionStartData: Seq[(Timestamp, Int)], sessionOptionalMetadata: Seq[(Timestamp, Int)], sessionEndData: Seq[(Timestamp, Int)] ): (StreamingQuery, StreamingQuery) = { val sessionStartEventsStream: MemoryStream[(Timestamp, Int)] = MemoryStream[(Timestamp, Int)] sessionStartEventsStream.addData(sessionStartData) val sessionStartEvents: DataFrame = sessionStartEventsStream .toDS() .toDF("sessionStartTimestamp", "sessionId") .withWatermark("sessionStartTimestamp", "1 second") val sessionOptionalMetadataEventsStream: MemoryStream[(Timestamp, Int)] = MemoryStream[(Timestamp, Int)] sessionOptionalMetadataEventsStream.addData(sessionOptionalMetadata) val sessionOptionalMetadataEvents: DataFrame = sessionOptionalMetadataEventsStream .toDS() .toDF("sessionOptionalMetadataTimestamp", "sessionId") .withWatermark("sessionOptionalMetadataTimestamp", "1 second") val sessionEndEventsStream: MemoryStream[(Timestamp, Int)] = MemoryStream[(Timestamp, Int)] sessionEndEventsStream.addData(sessionEndData) val sessionEndEvents: DataFrame = sessionEndEventsStream .toDS() .toDF("sessionEndTimestamp", "sessionId") .withWatermark("sessionEndTimestamp", "1 second") val (sessionStartsWithMetadata, endedSessionsWithMetadata) = process(sessionStartEvents, sessionOptionalMetadataEvents, sessionEndEvents) val sessionStartsWithMetadataQuery = sessionStartsWithMetadata .select(lit("sessionStartsWithMetadata"), col("*")) // Add label to see which query's output it is .writeStream .outputMode("append") .format("console") .option("truncate", "false") .option("numRows", "1000") .start() val endedSessionsWithMetadataQuery = endedSessionsWithMetadata .select(lit("endedSessionsWithMetadata"), col("*")) // Add label to see which query's output it is .writeStream .outputMode("append") .format("console") .option("truncate", "false") .option("numRows", "1000") .start() (sessionStartsWithMetadataQuery, endedSessionsWithMetadataQuery) } def batchProcessing( sessionStartData: Seq[(Timestamp, Int)], sessionOptionalMetadata: Seq[(Timestamp, Int)], sessionEndData: Seq[(Timestamp, Int)] ): Unit = { val sessionStartEvents = spark.createDataset(sessionStartData).toDF("sessionStartTimestamp", "sessionId") val sessionOptionalMetadataEvents = spark.createDataset(sessionOptionalMetadata).toDF("sessionOptionalMetadataTimestamp", "sessionId") val sessionEndEvents = spark.createDataset(sessionEndData).toDF("sessionEndTimestamp", "sessionId") val (sessionStartsWithMetadata, endedSessionsWithMetadata) = process(sessionStartEvents, sessionOptionalMetadataEvents, sessionEndEvents) println("sessionStartsWithMetadata") sessionStartsWithMetadata.show(100, truncate = false) println("endedSessionsWithMetadata") endedSessionsWithMetadata.show(100, truncate = false) } // Data is represented as tuples of (eventTime, sessionId)... val sessionStartData = Vector( (new Timestamp(1), 0), (new Timestamp(2000), 1), (new Timestamp(2000), 2), (new Timestamp(20000), 10) ) val sessionOptionalMetadata = Vector( (new Timestamp(1), 0), // session `1` has no metadata (new Timestamp(2000), 2), (new Timestamp(20000), 10) ) val sessionEndData = Vector( (new Timestamp(10000), 0), (new Timestamp(11000), 1), (new Timestamp(12000), 2), (new Timestamp(30000), 10) ) batchProcessing(sessionStartData, sessionOptionalMetadata, sessionEndData) val (sessionStartsWithMetadataQuery, endedSessionsWithMetadataQuery) = streamProcessing(sessionStartData, sessionOptionalMetadata, sessionEndData) {code} In the example session with ID {{1}} has no metadata, so the respective metadata column is {{null}}. The main functionality of joining the data is implemented in {{def process(…)}}, which is called using both batch data and stream data. In the batch version the output is as expected: {noformat} sessionStartsWithMetadata +---------+-----------------------+--------------------------------+ |sessionId|sessionStartTimestamp |sessionOptionalMetadataTimestamp| +---------+-----------------------+--------------------------------+ |0 |1970-01-01 01:00:00.001|1970-01-01 01:00:00.001 | |1 |1970-01-01 01:00:02 |null | ← has no metadata ✔ |2 |1970-01-01 01:00:02 |1970-01-01 01:00:02 | |10 |1970-01-01 01:00:20 |1970-01-01 01:00:20 | +---------+-----------------------+--------------------------------+ endedSessionsWithMetadata +---------+-----------------------+--------------------------------+-------------------+---------+ |sessionId|sessionStartTimestamp |sessionOptionalMetadataTimestamp|sessionEndTimestamp|sessionId| +---------+-----------------------+--------------------------------+-------------------+---------+ |0 |1970-01-01 01:00:00.001|1970-01-01 01:00:00.001 |1970-01-01 01:00:10|0 | |1 |1970-01-01 01:00:02 |null |1970-01-01 01:00:11|1 | ← has no metadata ✔ |2 |1970-01-01 01:00:02 |1970-01-01 01:00:02 |1970-01-01 01:00:12|2 | |10 |1970-01-01 01:00:20 |1970-01-01 01:00:20 |1970-01-01 01:00:30|10 | +---------+-----------------------+--------------------------------+-------------------+---------+ {noformat} But when the same processing is run as stream processing the output of {{endedSessionsWithMetadata}} does not contain the entry of session {{1}} that has no metadata: {noformat} ------------------------------------------- Batch: 0 ("start event") ------------------------------------------- +-------------------------+---------+-----------------------+--------------------------------+ |sessionStartsWithMetadata|sessionId|sessionStartTimestamp |sessionOptionalMetadataTimestamp| +-------------------------+---------+-----------------------+--------------------------------+ |sessionStartsWithMetadata|10 |1970-01-01 01:00:20 |1970-01-01 01:00:20 | |sessionStartsWithMetadata|2 |1970-01-01 01:00:02 |1970-01-01 01:00:02 | |sessionStartsWithMetadata|0 |1970-01-01 01:00:00.001|1970-01-01 01:00:00.001 | +-------------------------+---------+-----------------------+--------------------------------+ ------------------------------------------- Batch: 0 ("end event") ------------------------------------------- +-------------------------+---------+-----------------------+--------------------------------+-------------------+---------+ |endedSessionsWithMetadata|sessionId|sessionStartTimestamp |sessionOptionalMetadataTimestamp|sessionEndTimestamp|sessionId| +-------------------------+---------+-----------------------+--------------------------------+-------------------+---------+ |endedSessionsWithMetadata|10 |1970-01-01 01:00:20 |1970-01-01 01:00:20 |1970-01-01 01:00:30|10 | |endedSessionsWithMetadata|2 |1970-01-01 01:00:02 |1970-01-01 01:00:02 |1970-01-01 01:00:12|2 | |endedSessionsWithMetadata|0 |1970-01-01 01:00:00.001|1970-01-01 01:00:00.001 |1970-01-01 01:00:10|0 | +-------------------------+---------+-----------------------+--------------------------------+-------------------+---------+ ------------------------------------------- Batch: 1 ("start event") ------------------------------------------- +-------------------------+---------+---------------------+--------------------------------+ |sessionStartsWithMetadata|sessionId|sessionStartTimestamp|sessionOptionalMetadataTimestamp| +-------------------------+---------+---------------------+--------------------------------+ |sessionStartsWithMetadata|1 |1970-01-01 01:00:02 |null | ← has no metadata ✔ +-------------------------+---------+---------------------+--------------------------------+ ------------------------------------------- Batch: 1 ("end event") ------------------------------------------- +-------------------------+---------+---------------------+--------------------------------+-------------------+---------+ |endedSessionsWithMetadata|sessionId|sessionStartTimestamp|sessionOptionalMetadataTimestamp|sessionEndTimestamp|sessionId| +-------------------------+---------+---------------------+--------------------------------+-------------------+---------+ +-------------------------+---------+---------------------+--------------------------------+-------------------+---------+ ↳ ✘ here I would have expected a line with sessionId=1, that has "start" and "end" information, but no "metadata" ✘ {noformat} In a response it was suggested the issue looks related to [~ [mailing list post|http://apache-spark-developers-list.1001551.n3.nabble.com/correctness-issue-on-chained-streaming-streaming-join-td27358.html], but since I couldn't find a ticket here tracking the above mentioned issue, I'm creating this one. > Joining 3 streams results in incorrect output > --------------------------------------------- > > Key: SPARK-33259 > URL: https://issues.apache.org/jira/browse/SPARK-33259 > Project: Spark > Issue Type: Bug > Components: Structured Streaming > Affects Versions: 3.0.1 > Reporter: Michael > Priority: Major > > I encountered an issue with Structured Streaming when doing a ((A LEFT JOIN > B) JOIN C) operation. Below you can see example code I [posted on > Stackoverflow|https://stackoverflow.com/questions/64503539/]... > I created a minimal example of "sessions", that have "start" and "end" events > and optionally some "metadata". > The script generates two outputs: {{sessionStartsWithMetadata}} result from > "start" events that are left-joined with the "metadata" events, based on > {{sessionId}}. A "left join" is used, since we like to get an output event > even when no corresponding metadata exists. > Additionally a DataFrame {{endedSessionsWithMetadata}} is created by joining > "end" events to the previously created DataFrame. Here an "inner join" is > used, since we only want some output when a session has ended for sure. > This code can be executed in {{spark-shell}}: > {code:scala} > import java.sql.Timestamp > import org.apache.spark.sql.execution.streaming.{MemoryStream, > StreamingQueryWrapper} > import org.apache.spark.sql.streaming.StreamingQuery > import org.apache.spark.sql.{DataFrame, SQLContext} > import org.apache.spark.sql.functions.{col, expr, lit} > import spark.implicits._ > implicit val sqlContext: SQLContext = spark.sqlContext > // Main data processing, regardless whether batch or stream processing > def process( > sessionStartEvents: DataFrame, > sessionOptionalMetadataEvents: DataFrame, > sessionEndEvents: DataFrame > ): (DataFrame, DataFrame) = { > val sessionStartsWithMetadata: DataFrame = sessionStartEvents > .join( > sessionOptionalMetadataEvents, > sessionStartEvents("sessionId") === > sessionOptionalMetadataEvents("sessionId") && > sessionStartEvents("sessionStartTimestamp").between( > > sessionOptionalMetadataEvents("sessionOptionalMetadataTimestamp").minus(expr(s"INTERVAL > 1 seconds")), > > sessionOptionalMetadataEvents("sessionOptionalMetadataTimestamp").plus(expr(s"INTERVAL > 1 seconds")) > ), > "left" // metadata is optional > ) > .select( > sessionStartEvents("sessionId"), > sessionStartEvents("sessionStartTimestamp"), > sessionOptionalMetadataEvents("sessionOptionalMetadataTimestamp") > ) > val endedSessionsWithMetadata = sessionStartsWithMetadata.join( > sessionEndEvents, > sessionStartsWithMetadata("sessionId") === sessionEndEvents("sessionId") > && > sessionStartsWithMetadata("sessionStartTimestamp").between( > sessionEndEvents("sessionEndTimestamp").minus(expr(s"INTERVAL 10 > seconds")), > sessionEndEvents("sessionEndTimestamp") > ) > ) > (sessionStartsWithMetadata, endedSessionsWithMetadata) > } > def streamProcessing( > sessionStartData: Seq[(Timestamp, Int)], > sessionOptionalMetadata: Seq[(Timestamp, Int)], > sessionEndData: Seq[(Timestamp, Int)] > ): (StreamingQuery, StreamingQuery) = { > val sessionStartEventsStream: MemoryStream[(Timestamp, Int)] = > MemoryStream[(Timestamp, Int)] > sessionStartEventsStream.addData(sessionStartData) > val sessionStartEvents: DataFrame = sessionStartEventsStream > .toDS() > .toDF("sessionStartTimestamp", "sessionId") > .withWatermark("sessionStartTimestamp", "1 second") > val sessionOptionalMetadataEventsStream: MemoryStream[(Timestamp, Int)] = > MemoryStream[(Timestamp, Int)] > sessionOptionalMetadataEventsStream.addData(sessionOptionalMetadata) > val sessionOptionalMetadataEvents: DataFrame = > sessionOptionalMetadataEventsStream > .toDS() > .toDF("sessionOptionalMetadataTimestamp", "sessionId") > .withWatermark("sessionOptionalMetadataTimestamp", "1 second") > val sessionEndEventsStream: MemoryStream[(Timestamp, Int)] = > MemoryStream[(Timestamp, Int)] > sessionEndEventsStream.addData(sessionEndData) > val sessionEndEvents: DataFrame = sessionEndEventsStream > .toDS() > .toDF("sessionEndTimestamp", "sessionId") > .withWatermark("sessionEndTimestamp", "1 second") > val (sessionStartsWithMetadata, endedSessionsWithMetadata) = > process(sessionStartEvents, sessionOptionalMetadataEvents, > sessionEndEvents) > val sessionStartsWithMetadataQuery = sessionStartsWithMetadata > .select(lit("sessionStartsWithMetadata"), col("*")) // Add label to see > which query's output it is > .writeStream > .outputMode("append") > .format("console") > .option("truncate", "false") > .option("numRows", "1000") > .start() > val endedSessionsWithMetadataQuery = endedSessionsWithMetadata > .select(lit("endedSessionsWithMetadata"), col("*")) // Add label to see > which query's output it is > .writeStream > .outputMode("append") > .format("console") > .option("truncate", "false") > .option("numRows", "1000") > .start() > (sessionStartsWithMetadataQuery, endedSessionsWithMetadataQuery) > } > def batchProcessing( > sessionStartData: Seq[(Timestamp, Int)], > sessionOptionalMetadata: Seq[(Timestamp, Int)], > sessionEndData: Seq[(Timestamp, Int)] > ): Unit = { > val sessionStartEvents = > spark.createDataset(sessionStartData).toDF("sessionStartTimestamp", > "sessionId") > val sessionOptionalMetadataEvents = > spark.createDataset(sessionOptionalMetadata).toDF("sessionOptionalMetadataTimestamp", > "sessionId") > val sessionEndEvents = > spark.createDataset(sessionEndData).toDF("sessionEndTimestamp", "sessionId") > val (sessionStartsWithMetadata, endedSessionsWithMetadata) = > process(sessionStartEvents, sessionOptionalMetadataEvents, > sessionEndEvents) > println("sessionStartsWithMetadata") > sessionStartsWithMetadata.show(100, truncate = false) > println("endedSessionsWithMetadata") > endedSessionsWithMetadata.show(100, truncate = false) > } > // Data is represented as tuples of (eventTime, sessionId)... > val sessionStartData = Vector( > (new Timestamp(1), 0), > (new Timestamp(2000), 1), > (new Timestamp(2000), 2), > (new Timestamp(20000), 10) > ) > val sessionOptionalMetadata = Vector( > (new Timestamp(1), 0), > // session `1` has no metadata > (new Timestamp(2000), 2), > (new Timestamp(20000), 10) > ) > val sessionEndData = Vector( > (new Timestamp(10000), 0), > (new Timestamp(11000), 1), > (new Timestamp(12000), 2), > (new Timestamp(30000), 10) > ) > batchProcessing(sessionStartData, sessionOptionalMetadata, sessionEndData) > val (sessionStartsWithMetadataQuery, endedSessionsWithMetadataQuery) = > streamProcessing(sessionStartData, sessionOptionalMetadata, sessionEndData) > {code} > In the example session with ID {{1}} has no metadata, so the respective > metadata column is {{null}}. > The main functionality of joining the data is implemented in {{def > process(…)}}, which is called using both batch data and stream data. > In the batch version the output is as expected: > {noformat} > sessionStartsWithMetadata > +---------+-----------------------+--------------------------------+ > |sessionId|sessionStartTimestamp |sessionOptionalMetadataTimestamp| > +---------+-----------------------+--------------------------------+ > |0 |1970-01-01 01:00:00.001|1970-01-01 01:00:00.001 | > |1 |1970-01-01 01:00:02 |null | ← has no > metadata ✔ > |2 |1970-01-01 01:00:02 |1970-01-01 01:00:02 | > |10 |1970-01-01 01:00:20 |1970-01-01 01:00:20 | > +---------+-----------------------+--------------------------------+ > endedSessionsWithMetadata > +---------+-----------------------+--------------------------------+-------------------+---------+ > |sessionId|sessionStartTimestamp > |sessionOptionalMetadataTimestamp|sessionEndTimestamp|sessionId| > +---------+-----------------------+--------------------------------+-------------------+---------+ > |0 |1970-01-01 01:00:00.001|1970-01-01 01:00:00.001 > |1970-01-01 01:00:10|0 | > |1 |1970-01-01 01:00:02 |null > |1970-01-01 01:00:11|1 | ← has no metadata ✔ > |2 |1970-01-01 01:00:02 |1970-01-01 01:00:02 > |1970-01-01 01:00:12|2 | > |10 |1970-01-01 01:00:20 |1970-01-01 01:00:20 > |1970-01-01 01:00:30|10 | > +---------+-----------------------+--------------------------------+-------------------+---------+ > {noformat} > But when the same processing is run as stream processing the output of > {{endedSessionsWithMetadata}} does not contain the entry of session {{1}} > that has no metadata: > {noformat} > ------------------------------------------- > Batch: 0 ("start event") > ------------------------------------------- > +-------------------------+---------+-----------------------+--------------------------------+ > |sessionStartsWithMetadata|sessionId|sessionStartTimestamp > |sessionOptionalMetadataTimestamp| > +-------------------------+---------+-----------------------+--------------------------------+ > |sessionStartsWithMetadata|10 |1970-01-01 01:00:20 |1970-01-01 > 01:00:20 | > |sessionStartsWithMetadata|2 |1970-01-01 01:00:02 |1970-01-01 > 01:00:02 | > |sessionStartsWithMetadata|0 |1970-01-01 01:00:00.001|1970-01-01 > 01:00:00.001 | > +-------------------------+---------+-----------------------+--------------------------------+ > ------------------------------------------- > Batch: 0 ("end event") > ------------------------------------------- > +-------------------------+---------+-----------------------+--------------------------------+-------------------+---------+ > |endedSessionsWithMetadata|sessionId|sessionStartTimestamp > |sessionOptionalMetadataTimestamp|sessionEndTimestamp|sessionId| > +-------------------------+---------+-----------------------+--------------------------------+-------------------+---------+ > |endedSessionsWithMetadata|10 |1970-01-01 01:00:20 |1970-01-01 > 01:00:20 |1970-01-01 01:00:30|10 | > |endedSessionsWithMetadata|2 |1970-01-01 01:00:02 |1970-01-01 > 01:00:02 |1970-01-01 01:00:12|2 | > |endedSessionsWithMetadata|0 |1970-01-01 01:00:00.001|1970-01-01 > 01:00:00.001 |1970-01-01 01:00:10|0 | > +-------------------------+---------+-----------------------+--------------------------------+-------------------+---------+ > ------------------------------------------- > Batch: 1 ("start event") > ------------------------------------------- > +-------------------------+---------+---------------------+--------------------------------+ > |sessionStartsWithMetadata|sessionId|sessionStartTimestamp|sessionOptionalMetadataTimestamp| > +-------------------------+---------+---------------------+--------------------------------+ > |sessionStartsWithMetadata|1 |1970-01-01 01:00:02 |null > | ← has no metadata ✔ > +-------------------------+---------+---------------------+--------------------------------+ > ------------------------------------------- > Batch: 1 ("end event") > ------------------------------------------- > +-------------------------+---------+---------------------+--------------------------------+-------------------+---------+ > |endedSessionsWithMetadata|sessionId|sessionStartTimestamp|sessionOptionalMetadataTimestamp|sessionEndTimestamp|sessionId| > +-------------------------+---------+---------------------+--------------------------------+-------------------+---------+ > +-------------------------+---------+---------------------+--------------------------------+-------------------+---------+ > ↳ ✘ here I would have expected a line with sessionId=1, that has "start" > and "end" information, but no "metadata" ✘ > {noformat} > In a response it was suggested the issue looks related to [~kabhwan]'s > [mailing list > post|http://apache-spark-developers-list.1001551.n3.nabble.com/correctness-issue-on-chained-streaming-streaming-join-td27358.html], > but since I couldn't find a ticket here tracking the above mentioned issue, > I'm creating this one. -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org