[ 
https://issues.apache.org/jira/browse/SPARK-33259?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael updated SPARK-33259:
----------------------------
    Description: 
I encountered an issue with Structured Streaming when doing a ((A LEFT JOIN B) 
JOIN C) operation. Below you can see example code I [posted on 
Stackoverflow|https://stackoverflow.com/questions/64503539/]...

I created a minimal example of "sessions", that have "start" and "end" events 
and optionally some "metadata".

The script generates two outputs: {{sessionStartsWithMetadata}} result from 
"start" events that are left-joined with the "metadata" events, based on 
{{sessionId}}. A "left join" is used, since we like to get an output event even 
when no corresponding metadata exists.

Additionally a DataFrame {{endedSessionsWithMetadata}} is created by joining 
"end" events to the previously created DataFrame. Here an "inner join" is used, 
since we only want some output when a session has ended for sure.

This code can be executed in {{spark-shell}}:
{code:scala}
import java.sql.Timestamp
import org.apache.spark.sql.execution.streaming.{MemoryStream, 
StreamingQueryWrapper}
import org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.sql.functions.{col, expr, lit}

import spark.implicits._
implicit val sqlContext: SQLContext = spark.sqlContext

// Main data processing, regardless whether batch or stream processing
def process(
    sessionStartEvents: DataFrame,
    sessionOptionalMetadataEvents: DataFrame,
    sessionEndEvents: DataFrame
): (DataFrame, DataFrame) = {
  val sessionStartsWithMetadata: DataFrame = sessionStartEvents
    .join(
      sessionOptionalMetadataEvents,
      sessionStartEvents("sessionId") === 
sessionOptionalMetadataEvents("sessionId") &&
        sessionStartEvents("sessionStartTimestamp").between(
          
sessionOptionalMetadataEvents("sessionOptionalMetadataTimestamp").minus(expr(s"INTERVAL
 1 seconds")),
          
sessionOptionalMetadataEvents("sessionOptionalMetadataTimestamp").plus(expr(s"INTERVAL
 1 seconds"))
        ),
      "left" // metadata is optional
    )
    .select(
      sessionStartEvents("sessionId"),
      sessionStartEvents("sessionStartTimestamp"),
      sessionOptionalMetadataEvents("sessionOptionalMetadataTimestamp")
    )

  val endedSessionsWithMetadata = sessionStartsWithMetadata.join(
    sessionEndEvents,
    sessionStartsWithMetadata("sessionId") === sessionEndEvents("sessionId") &&
      sessionStartsWithMetadata("sessionStartTimestamp").between(
        sessionEndEvents("sessionEndTimestamp").minus(expr(s"INTERVAL 10 
seconds")),
        sessionEndEvents("sessionEndTimestamp")
      )
  )

  (sessionStartsWithMetadata, endedSessionsWithMetadata)
}

def streamProcessing(
    sessionStartData: Seq[(Timestamp, Int)],
    sessionOptionalMetadata: Seq[(Timestamp, Int)],
    sessionEndData: Seq[(Timestamp, Int)]
): (StreamingQuery, StreamingQuery) = {

  val sessionStartEventsStream: MemoryStream[(Timestamp, Int)] = 
MemoryStream[(Timestamp, Int)]
  sessionStartEventsStream.addData(sessionStartData)

  val sessionStartEvents: DataFrame = sessionStartEventsStream
    .toDS()
    .toDF("sessionStartTimestamp", "sessionId")
    .withWatermark("sessionStartTimestamp", "1 second")

  val sessionOptionalMetadataEventsStream: MemoryStream[(Timestamp, Int)] = 
MemoryStream[(Timestamp, Int)]
  sessionOptionalMetadataEventsStream.addData(sessionOptionalMetadata)

  val sessionOptionalMetadataEvents: DataFrame = 
sessionOptionalMetadataEventsStream
    .toDS()
    .toDF("sessionOptionalMetadataTimestamp", "sessionId")
    .withWatermark("sessionOptionalMetadataTimestamp", "1 second")

  val sessionEndEventsStream: MemoryStream[(Timestamp, Int)] = 
MemoryStream[(Timestamp, Int)]
  sessionEndEventsStream.addData(sessionEndData)

  val sessionEndEvents: DataFrame = sessionEndEventsStream
    .toDS()
    .toDF("sessionEndTimestamp", "sessionId")
    .withWatermark("sessionEndTimestamp", "1 second")

  val (sessionStartsWithMetadata, endedSessionsWithMetadata) =
    process(sessionStartEvents, sessionOptionalMetadataEvents, sessionEndEvents)

  val sessionStartsWithMetadataQuery = sessionStartsWithMetadata
    .select(lit("sessionStartsWithMetadata"), col("*")) // Add label to see 
which query's output it is
    .writeStream
    .outputMode("append")
    .format("console")
    .option("truncate", "false")
    .option("numRows", "1000")
    .start()

  val endedSessionsWithMetadataQuery = endedSessionsWithMetadata
    .select(lit("endedSessionsWithMetadata"), col("*")) // Add label to see 
which query's output it is
    .writeStream
    .outputMode("append")
    .format("console")
    .option("truncate", "false")
    .option("numRows", "1000")
    .start()

  (sessionStartsWithMetadataQuery, endedSessionsWithMetadataQuery)
}

def batchProcessing(
    sessionStartData: Seq[(Timestamp, Int)],
    sessionOptionalMetadata: Seq[(Timestamp, Int)],
    sessionEndData: Seq[(Timestamp, Int)]
): Unit = {

  val sessionStartEvents = 
spark.createDataset(sessionStartData).toDF("sessionStartTimestamp", "sessionId")
  val sessionOptionalMetadataEvents = 
spark.createDataset(sessionOptionalMetadata).toDF("sessionOptionalMetadataTimestamp",
 "sessionId")
  val sessionEndEvents = 
spark.createDataset(sessionEndData).toDF("sessionEndTimestamp", "sessionId")

  val (sessionStartsWithMetadata, endedSessionsWithMetadata) =
    process(sessionStartEvents, sessionOptionalMetadataEvents, sessionEndEvents)

  println("sessionStartsWithMetadata")
  sessionStartsWithMetadata.show(100, truncate = false)

  println("endedSessionsWithMetadata")
  endedSessionsWithMetadata.show(100, truncate = false)
}


// Data is represented as tuples of (eventTime, sessionId)...
val sessionStartData = Vector(
  (new Timestamp(1), 0),
  (new Timestamp(2000), 1),
  (new Timestamp(2000), 2),
  (new Timestamp(20000), 10)
)

val sessionOptionalMetadata = Vector(
  (new Timestamp(1), 0),
  // session `1` has no metadata
  (new Timestamp(2000), 2),
  (new Timestamp(20000), 10)
)

val sessionEndData = Vector(
  (new Timestamp(10000), 0),
  (new Timestamp(11000), 1),
  (new Timestamp(12000), 2),
  (new Timestamp(30000), 10)
)

batchProcessing(sessionStartData, sessionOptionalMetadata, sessionEndData)

val (sessionStartsWithMetadataQuery, endedSessionsWithMetadataQuery) =
  streamProcessing(sessionStartData, sessionOptionalMetadata, sessionEndData)
{code}
In the example session with ID {{1}} has no metadata, so the respective 
metadata column is {{null}}.

The main functionality of joining the data is implemented in {{def 
process(…)}}, which is called using both batch data and stream data.

In the batch version the output is as expected:
{noformat}
sessionStartsWithMetadata
+---------+-----------------------+--------------------------------+
|sessionId|sessionStartTimestamp  |sessionOptionalMetadataTimestamp|
+---------+-----------------------+--------------------------------+
|0        |1970-01-01 01:00:00.001|1970-01-01 01:00:00.001         |
|1        |1970-01-01 01:00:02    |null                            | ← has no 
metadata ✔
|2        |1970-01-01 01:00:02    |1970-01-01 01:00:02             |
|10       |1970-01-01 01:00:20    |1970-01-01 01:00:20             |
+---------+-----------------------+--------------------------------+

endedSessionsWithMetadata
+---------+-----------------------+--------------------------------+-------------------+---------+
|sessionId|sessionStartTimestamp  
|sessionOptionalMetadataTimestamp|sessionEndTimestamp|sessionId|
+---------+-----------------------+--------------------------------+-------------------+---------+
|0        |1970-01-01 01:00:00.001|1970-01-01 01:00:00.001         |1970-01-01 
01:00:10|0        |
|1        |1970-01-01 01:00:02    |null                            |1970-01-01 
01:00:11|1        |  ← has no metadata ✔
|2        |1970-01-01 01:00:02    |1970-01-01 01:00:02             |1970-01-01 
01:00:12|2        |
|10       |1970-01-01 01:00:20    |1970-01-01 01:00:20             |1970-01-01 
01:00:30|10       |
+---------+-----------------------+--------------------------------+-------------------+---------+
{noformat}
But when the same processing is run as stream processing the output of 
{{endedSessionsWithMetadata}} does not contain the entry of session {{1}} that 
has no metadata:
{noformat}
-------------------------------------------
Batch: 0 ("start event")
-------------------------------------------
+-------------------------+---------+-----------------------+--------------------------------+
|sessionStartsWithMetadata|sessionId|sessionStartTimestamp  
|sessionOptionalMetadataTimestamp|
+-------------------------+---------+-----------------------+--------------------------------+
|sessionStartsWithMetadata|10       |1970-01-01 01:00:20    |1970-01-01 
01:00:20             |
|sessionStartsWithMetadata|2        |1970-01-01 01:00:02    |1970-01-01 
01:00:02             |
|sessionStartsWithMetadata|0        |1970-01-01 01:00:00.001|1970-01-01 
01:00:00.001         |
+-------------------------+---------+-----------------------+--------------------------------+

-------------------------------------------
Batch: 0 ("end event")
-------------------------------------------
+-------------------------+---------+-----------------------+--------------------------------+-------------------+---------+
|endedSessionsWithMetadata|sessionId|sessionStartTimestamp  
|sessionOptionalMetadataTimestamp|sessionEndTimestamp|sessionId|
+-------------------------+---------+-----------------------+--------------------------------+-------------------+---------+
|endedSessionsWithMetadata|10       |1970-01-01 01:00:20    |1970-01-01 
01:00:20             |1970-01-01 01:00:30|10       |
|endedSessionsWithMetadata|2        |1970-01-01 01:00:02    |1970-01-01 
01:00:02             |1970-01-01 01:00:12|2        |
|endedSessionsWithMetadata|0        |1970-01-01 01:00:00.001|1970-01-01 
01:00:00.001         |1970-01-01 01:00:10|0        |
+-------------------------+---------+-----------------------+--------------------------------+-------------------+---------+

-------------------------------------------
Batch: 1 ("start event")
-------------------------------------------
+-------------------------+---------+---------------------+--------------------------------+
|sessionStartsWithMetadata|sessionId|sessionStartTimestamp|sessionOptionalMetadataTimestamp|
+-------------------------+---------+---------------------+--------------------------------+
|sessionStartsWithMetadata|1        |1970-01-01 01:00:02  |null                 
           | ← has no metadata ✔
+-------------------------+---------+---------------------+--------------------------------+

-------------------------------------------
Batch: 1 ("end event")
-------------------------------------------
+-------------------------+---------+---------------------+--------------------------------+-------------------+---------+
|endedSessionsWithMetadata|sessionId|sessionStartTimestamp|sessionOptionalMetadataTimestamp|sessionEndTimestamp|sessionId|
+-------------------------+---------+---------------------+--------------------------------+-------------------+---------+
+-------------------------+---------+---------------------+--------------------------------+-------------------+---------+
  ↳ ✘ here I would have expected a line with sessionId=1, that has "start" and 
"end" information, but no "metadata" ✘
{noformat}
In a response it was suggested the issue looks related to [~kabhwan]'s [mailing 
list 
post|http://apache-spark-developers-list.1001551.n3.nabble.com/correctness-issue-on-chained-streaming-streaming-join-td27358.html],
 but since I couldn't find a ticket here tracking the above mentioned issue, 
I'm creating this one.

  was:
I encountered an issue with Structured Streaming when doing a ((A LEFT JOIN B) 
JOIN C) operation. Below you can see example code I [posted on 
Stackoverflow|https://stackoverflow.com/questions/64503539/]...

I created a minimal example of "sessions", that have "start" and "end" events 
and optionally some "metadata".

The script generates two outputs: {{sessionStartsWithMetadata}} result from 
"start" events that are left-joined with the "metadata" events, based on 
{{sessionId}}. A "left join" is used, since we like to get an output event even 
when no corresponding metadata exists.

Additionally a DataFrame {{endedSessionsWithMetadata}} is created by joining 
"end" events to the previously created DataFrame. Here an "inner join" is used, 
since we only want some output when a session has ended for sure.

This code can be executed in {{spark-shell}}:
{code:scala}
import java.sql.Timestamp
import org.apache.spark.sql.execution.streaming.{MemoryStream, 
StreamingQueryWrapper}
import org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.sql.functions.{col, expr, lit}

import spark.implicits._
implicit val sqlContext: SQLContext = spark.sqlContext

// Main data processing, regardless whether batch or stream processing
def process(
    sessionStartEvents: DataFrame,
    sessionOptionalMetadataEvents: DataFrame,
    sessionEndEvents: DataFrame
): (DataFrame, DataFrame) = {
  val sessionStartsWithMetadata: DataFrame = sessionStartEvents
    .join(
      sessionOptionalMetadataEvents,
      sessionStartEvents("sessionId") === 
sessionOptionalMetadataEvents("sessionId") &&
        sessionStartEvents("sessionStartTimestamp").between(
          
sessionOptionalMetadataEvents("sessionOptionalMetadataTimestamp").minus(expr(s"INTERVAL
 1 seconds")),
          
sessionOptionalMetadataEvents("sessionOptionalMetadataTimestamp").plus(expr(s"INTERVAL
 1 seconds"))
        ),
      "left" // metadata is optional
    )
    .select(
      sessionStartEvents("sessionId"),
      sessionStartEvents("sessionStartTimestamp"),
      sessionOptionalMetadataEvents("sessionOptionalMetadataTimestamp")
    )

  val endedSessionsWithMetadata = sessionStartsWithMetadata.join(
    sessionEndEvents,
    sessionStartsWithMetadata("sessionId") === sessionEndEvents("sessionId") &&
      sessionStartsWithMetadata("sessionStartTimestamp").between(
        sessionEndEvents("sessionEndTimestamp").minus(expr(s"INTERVAL 10 
seconds")),
        sessionEndEvents("sessionEndTimestamp")
      )
  )

  (sessionStartsWithMetadata, endedSessionsWithMetadata)
}

def streamProcessing(
    sessionStartData: Seq[(Timestamp, Int)],
    sessionOptionalMetadata: Seq[(Timestamp, Int)],
    sessionEndData: Seq[(Timestamp, Int)]
): (StreamingQuery, StreamingQuery) = {

  val sessionStartEventsStream: MemoryStream[(Timestamp, Int)] = 
MemoryStream[(Timestamp, Int)]
  sessionStartEventsStream.addData(sessionStartData)

  val sessionStartEvents: DataFrame = sessionStartEventsStream
    .toDS()
    .toDF("sessionStartTimestamp", "sessionId")
    .withWatermark("sessionStartTimestamp", "1 second")

  val sessionOptionalMetadataEventsStream: MemoryStream[(Timestamp, Int)] = 
MemoryStream[(Timestamp, Int)]
  sessionOptionalMetadataEventsStream.addData(sessionOptionalMetadata)

  val sessionOptionalMetadataEvents: DataFrame = 
sessionOptionalMetadataEventsStream
    .toDS()
    .toDF("sessionOptionalMetadataTimestamp", "sessionId")
    .withWatermark("sessionOptionalMetadataTimestamp", "1 second")

  val sessionEndEventsStream: MemoryStream[(Timestamp, Int)] = 
MemoryStream[(Timestamp, Int)]
  sessionEndEventsStream.addData(sessionEndData)

  val sessionEndEvents: DataFrame = sessionEndEventsStream
    .toDS()
    .toDF("sessionEndTimestamp", "sessionId")
    .withWatermark("sessionEndTimestamp", "1 second")

  val (sessionStartsWithMetadata, endedSessionsWithMetadata) =
    process(sessionStartEvents, sessionOptionalMetadataEvents, sessionEndEvents)

  val sessionStartsWithMetadataQuery = sessionStartsWithMetadata
    .select(lit("sessionStartsWithMetadata"), col("*")) // Add label to see 
which query's output it is
    .writeStream
    .outputMode("append")
    .format("console")
    .option("truncate", "false")
    .option("numRows", "1000")
    .start()

  val endedSessionsWithMetadataQuery = endedSessionsWithMetadata
    .select(lit("endedSessionsWithMetadata"), col("*")) // Add label to see 
which query's output it is
    .writeStream
    .outputMode("append")
    .format("console")
    .option("truncate", "false")
    .option("numRows", "1000")
    .start()

  (sessionStartsWithMetadataQuery, endedSessionsWithMetadataQuery)
}

def batchProcessing(
    sessionStartData: Seq[(Timestamp, Int)],
    sessionOptionalMetadata: Seq[(Timestamp, Int)],
    sessionEndData: Seq[(Timestamp, Int)]
): Unit = {

  val sessionStartEvents = 
spark.createDataset(sessionStartData).toDF("sessionStartTimestamp", "sessionId")
  val sessionOptionalMetadataEvents = 
spark.createDataset(sessionOptionalMetadata).toDF("sessionOptionalMetadataTimestamp",
 "sessionId")
  val sessionEndEvents = 
spark.createDataset(sessionEndData).toDF("sessionEndTimestamp", "sessionId")

  val (sessionStartsWithMetadata, endedSessionsWithMetadata) =
    process(sessionStartEvents, sessionOptionalMetadataEvents, sessionEndEvents)

  println("sessionStartsWithMetadata")
  sessionStartsWithMetadata.show(100, truncate = false)

  println("endedSessionsWithMetadata")
  endedSessionsWithMetadata.show(100, truncate = false)
}


// Data is represented as tuples of (eventTime, sessionId)...
val sessionStartData = Vector(
  (new Timestamp(1), 0),
  (new Timestamp(2000), 1),
  (new Timestamp(2000), 2),
  (new Timestamp(20000), 10)
)

val sessionOptionalMetadata = Vector(
  (new Timestamp(1), 0),
  // session `1` has no metadata
  (new Timestamp(2000), 2),
  (new Timestamp(20000), 10)
)

val sessionEndData = Vector(
  (new Timestamp(10000), 0),
  (new Timestamp(11000), 1),
  (new Timestamp(12000), 2),
  (new Timestamp(30000), 10)
)

batchProcessing(sessionStartData, sessionOptionalMetadata, sessionEndData)

val (sessionStartsWithMetadataQuery, endedSessionsWithMetadataQuery) =
  streamProcessing(sessionStartData, sessionOptionalMetadata, sessionEndData)
{code}
In the example session with ID {{1}} has no metadata, so the respective 
metadata column is {{null}}.

The main functionality of joining the data is implemented in {{def 
process(…)}}, which is called using both batch data and stream data.

In the batch version the output is as expected:
{noformat}
sessionStartsWithMetadata
+---------+-----------------------+--------------------------------+
|sessionId|sessionStartTimestamp  |sessionOptionalMetadataTimestamp|
+---------+-----------------------+--------------------------------+
|0        |1970-01-01 01:00:00.001|1970-01-01 01:00:00.001         |
|1        |1970-01-01 01:00:02    |null                            | ← has no 
metadata ✔
|2        |1970-01-01 01:00:02    |1970-01-01 01:00:02             |
|10       |1970-01-01 01:00:20    |1970-01-01 01:00:20             |
+---------+-----------------------+--------------------------------+

endedSessionsWithMetadata
+---------+-----------------------+--------------------------------+-------------------+---------+
|sessionId|sessionStartTimestamp  
|sessionOptionalMetadataTimestamp|sessionEndTimestamp|sessionId|
+---------+-----------------------+--------------------------------+-------------------+---------+
|0        |1970-01-01 01:00:00.001|1970-01-01 01:00:00.001         |1970-01-01 
01:00:10|0        |
|1        |1970-01-01 01:00:02    |null                            |1970-01-01 
01:00:11|1        |  ← has no metadata ✔
|2        |1970-01-01 01:00:02    |1970-01-01 01:00:02             |1970-01-01 
01:00:12|2        |
|10       |1970-01-01 01:00:20    |1970-01-01 01:00:20             |1970-01-01 
01:00:30|10       |
+---------+-----------------------+--------------------------------+-------------------+---------+
{noformat}
But when the same processing is run as stream processing the output of 
{{endedSessionsWithMetadata}} does not contain the entry of session {{1}} that 
has no metadata:
{noformat}
-------------------------------------------
Batch: 0 ("start event")
-------------------------------------------
+-------------------------+---------+-----------------------+--------------------------------+
|sessionStartsWithMetadata|sessionId|sessionStartTimestamp  
|sessionOptionalMetadataTimestamp|
+-------------------------+---------+-----------------------+--------------------------------+
|sessionStartsWithMetadata|10       |1970-01-01 01:00:20    |1970-01-01 
01:00:20             |
|sessionStartsWithMetadata|2        |1970-01-01 01:00:02    |1970-01-01 
01:00:02             |
|sessionStartsWithMetadata|0        |1970-01-01 01:00:00.001|1970-01-01 
01:00:00.001         |
+-------------------------+---------+-----------------------+--------------------------------+

-------------------------------------------
Batch: 0 ("end event")
-------------------------------------------
+-------------------------+---------+-----------------------+--------------------------------+-------------------+---------+
|endedSessionsWithMetadata|sessionId|sessionStartTimestamp  
|sessionOptionalMetadataTimestamp|sessionEndTimestamp|sessionId|
+-------------------------+---------+-----------------------+--------------------------------+-------------------+---------+
|endedSessionsWithMetadata|10       |1970-01-01 01:00:20    |1970-01-01 
01:00:20             |1970-01-01 01:00:30|10       |
|endedSessionsWithMetadata|2        |1970-01-01 01:00:02    |1970-01-01 
01:00:02             |1970-01-01 01:00:12|2        |
|endedSessionsWithMetadata|0        |1970-01-01 01:00:00.001|1970-01-01 
01:00:00.001         |1970-01-01 01:00:10|0        |
+-------------------------+---------+-----------------------+--------------------------------+-------------------+---------+

-------------------------------------------
Batch: 1 ("start event")
-------------------------------------------
+-------------------------+---------+---------------------+--------------------------------+
|sessionStartsWithMetadata|sessionId|sessionStartTimestamp|sessionOptionalMetadataTimestamp|
+-------------------------+---------+---------------------+--------------------------------+
|sessionStartsWithMetadata|1        |1970-01-01 01:00:02  |null                 
           | ← has no metadata ✔
+-------------------------+---------+---------------------+--------------------------------+

-------------------------------------------
Batch: 1 ("end event")
-------------------------------------------
+-------------------------+---------+---------------------+--------------------------------+-------------------+---------+
|endedSessionsWithMetadata|sessionId|sessionStartTimestamp|sessionOptionalMetadataTimestamp|sessionEndTimestamp|sessionId|
+-------------------------+---------+---------------------+--------------------------------+-------------------+---------+
+-------------------------+---------+---------------------+--------------------------------+-------------------+---------+
  ↳ ✘ here I would have expected a line with sessionId=1, that has "start" and 
"end" information, but no "metadata" ✘
{noformat}
In a response it was suggested the issue looks related to [~ [mailing list 
post|http://apache-spark-developers-list.1001551.n3.nabble.com/correctness-issue-on-chained-streaming-streaming-join-td27358.html],
 but since I couldn't find a ticket here tracking the above mentioned issue, 
I'm creating this one.


> Joining 3 streams results in incorrect output
> ---------------------------------------------
>
>                 Key: SPARK-33259
>                 URL: https://issues.apache.org/jira/browse/SPARK-33259
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 3.0.1
>            Reporter: Michael
>            Priority: Major
>
> I encountered an issue with Structured Streaming when doing a ((A LEFT JOIN 
> B) JOIN C) operation. Below you can see example code I [posted on 
> Stackoverflow|https://stackoverflow.com/questions/64503539/]...
> I created a minimal example of "sessions", that have "start" and "end" events 
> and optionally some "metadata".
> The script generates two outputs: {{sessionStartsWithMetadata}} result from 
> "start" events that are left-joined with the "metadata" events, based on 
> {{sessionId}}. A "left join" is used, since we like to get an output event 
> even when no corresponding metadata exists.
> Additionally a DataFrame {{endedSessionsWithMetadata}} is created by joining 
> "end" events to the previously created DataFrame. Here an "inner join" is 
> used, since we only want some output when a session has ended for sure.
> This code can be executed in {{spark-shell}}:
> {code:scala}
> import java.sql.Timestamp
> import org.apache.spark.sql.execution.streaming.{MemoryStream, 
> StreamingQueryWrapper}
> import org.apache.spark.sql.streaming.StreamingQuery
> import org.apache.spark.sql.{DataFrame, SQLContext}
> import org.apache.spark.sql.functions.{col, expr, lit}
> import spark.implicits._
> implicit val sqlContext: SQLContext = spark.sqlContext
> // Main data processing, regardless whether batch or stream processing
> def process(
>     sessionStartEvents: DataFrame,
>     sessionOptionalMetadataEvents: DataFrame,
>     sessionEndEvents: DataFrame
> ): (DataFrame, DataFrame) = {
>   val sessionStartsWithMetadata: DataFrame = sessionStartEvents
>     .join(
>       sessionOptionalMetadataEvents,
>       sessionStartEvents("sessionId") === 
> sessionOptionalMetadataEvents("sessionId") &&
>         sessionStartEvents("sessionStartTimestamp").between(
>           
> sessionOptionalMetadataEvents("sessionOptionalMetadataTimestamp").minus(expr(s"INTERVAL
>  1 seconds")),
>           
> sessionOptionalMetadataEvents("sessionOptionalMetadataTimestamp").plus(expr(s"INTERVAL
>  1 seconds"))
>         ),
>       "left" // metadata is optional
>     )
>     .select(
>       sessionStartEvents("sessionId"),
>       sessionStartEvents("sessionStartTimestamp"),
>       sessionOptionalMetadataEvents("sessionOptionalMetadataTimestamp")
>     )
>   val endedSessionsWithMetadata = sessionStartsWithMetadata.join(
>     sessionEndEvents,
>     sessionStartsWithMetadata("sessionId") === sessionEndEvents("sessionId") 
> &&
>       sessionStartsWithMetadata("sessionStartTimestamp").between(
>         sessionEndEvents("sessionEndTimestamp").minus(expr(s"INTERVAL 10 
> seconds")),
>         sessionEndEvents("sessionEndTimestamp")
>       )
>   )
>   (sessionStartsWithMetadata, endedSessionsWithMetadata)
> }
> def streamProcessing(
>     sessionStartData: Seq[(Timestamp, Int)],
>     sessionOptionalMetadata: Seq[(Timestamp, Int)],
>     sessionEndData: Seq[(Timestamp, Int)]
> ): (StreamingQuery, StreamingQuery) = {
>   val sessionStartEventsStream: MemoryStream[(Timestamp, Int)] = 
> MemoryStream[(Timestamp, Int)]
>   sessionStartEventsStream.addData(sessionStartData)
>   val sessionStartEvents: DataFrame = sessionStartEventsStream
>     .toDS()
>     .toDF("sessionStartTimestamp", "sessionId")
>     .withWatermark("sessionStartTimestamp", "1 second")
>   val sessionOptionalMetadataEventsStream: MemoryStream[(Timestamp, Int)] = 
> MemoryStream[(Timestamp, Int)]
>   sessionOptionalMetadataEventsStream.addData(sessionOptionalMetadata)
>   val sessionOptionalMetadataEvents: DataFrame = 
> sessionOptionalMetadataEventsStream
>     .toDS()
>     .toDF("sessionOptionalMetadataTimestamp", "sessionId")
>     .withWatermark("sessionOptionalMetadataTimestamp", "1 second")
>   val sessionEndEventsStream: MemoryStream[(Timestamp, Int)] = 
> MemoryStream[(Timestamp, Int)]
>   sessionEndEventsStream.addData(sessionEndData)
>   val sessionEndEvents: DataFrame = sessionEndEventsStream
>     .toDS()
>     .toDF("sessionEndTimestamp", "sessionId")
>     .withWatermark("sessionEndTimestamp", "1 second")
>   val (sessionStartsWithMetadata, endedSessionsWithMetadata) =
>     process(sessionStartEvents, sessionOptionalMetadataEvents, 
> sessionEndEvents)
>   val sessionStartsWithMetadataQuery = sessionStartsWithMetadata
>     .select(lit("sessionStartsWithMetadata"), col("*")) // Add label to see 
> which query's output it is
>     .writeStream
>     .outputMode("append")
>     .format("console")
>     .option("truncate", "false")
>     .option("numRows", "1000")
>     .start()
>   val endedSessionsWithMetadataQuery = endedSessionsWithMetadata
>     .select(lit("endedSessionsWithMetadata"), col("*")) // Add label to see 
> which query's output it is
>     .writeStream
>     .outputMode("append")
>     .format("console")
>     .option("truncate", "false")
>     .option("numRows", "1000")
>     .start()
>   (sessionStartsWithMetadataQuery, endedSessionsWithMetadataQuery)
> }
> def batchProcessing(
>     sessionStartData: Seq[(Timestamp, Int)],
>     sessionOptionalMetadata: Seq[(Timestamp, Int)],
>     sessionEndData: Seq[(Timestamp, Int)]
> ): Unit = {
>   val sessionStartEvents = 
> spark.createDataset(sessionStartData).toDF("sessionStartTimestamp", 
> "sessionId")
>   val sessionOptionalMetadataEvents = 
> spark.createDataset(sessionOptionalMetadata).toDF("sessionOptionalMetadataTimestamp",
>  "sessionId")
>   val sessionEndEvents = 
> spark.createDataset(sessionEndData).toDF("sessionEndTimestamp", "sessionId")
>   val (sessionStartsWithMetadata, endedSessionsWithMetadata) =
>     process(sessionStartEvents, sessionOptionalMetadataEvents, 
> sessionEndEvents)
>   println("sessionStartsWithMetadata")
>   sessionStartsWithMetadata.show(100, truncate = false)
>   println("endedSessionsWithMetadata")
>   endedSessionsWithMetadata.show(100, truncate = false)
> }
> // Data is represented as tuples of (eventTime, sessionId)...
> val sessionStartData = Vector(
>   (new Timestamp(1), 0),
>   (new Timestamp(2000), 1),
>   (new Timestamp(2000), 2),
>   (new Timestamp(20000), 10)
> )
> val sessionOptionalMetadata = Vector(
>   (new Timestamp(1), 0),
>   // session `1` has no metadata
>   (new Timestamp(2000), 2),
>   (new Timestamp(20000), 10)
> )
> val sessionEndData = Vector(
>   (new Timestamp(10000), 0),
>   (new Timestamp(11000), 1),
>   (new Timestamp(12000), 2),
>   (new Timestamp(30000), 10)
> )
> batchProcessing(sessionStartData, sessionOptionalMetadata, sessionEndData)
> val (sessionStartsWithMetadataQuery, endedSessionsWithMetadataQuery) =
>   streamProcessing(sessionStartData, sessionOptionalMetadata, sessionEndData)
> {code}
> In the example session with ID {{1}} has no metadata, so the respective 
> metadata column is {{null}}.
> The main functionality of joining the data is implemented in {{def 
> process(…)}}, which is called using both batch data and stream data.
> In the batch version the output is as expected:
> {noformat}
> sessionStartsWithMetadata
> +---------+-----------------------+--------------------------------+
> |sessionId|sessionStartTimestamp  |sessionOptionalMetadataTimestamp|
> +---------+-----------------------+--------------------------------+
> |0        |1970-01-01 01:00:00.001|1970-01-01 01:00:00.001         |
> |1        |1970-01-01 01:00:02    |null                            | ← has no 
> metadata ✔
> |2        |1970-01-01 01:00:02    |1970-01-01 01:00:02             |
> |10       |1970-01-01 01:00:20    |1970-01-01 01:00:20             |
> +---------+-----------------------+--------------------------------+
> endedSessionsWithMetadata
> +---------+-----------------------+--------------------------------+-------------------+---------+
> |sessionId|sessionStartTimestamp  
> |sessionOptionalMetadataTimestamp|sessionEndTimestamp|sessionId|
> +---------+-----------------------+--------------------------------+-------------------+---------+
> |0        |1970-01-01 01:00:00.001|1970-01-01 01:00:00.001         
> |1970-01-01 01:00:10|0        |
> |1        |1970-01-01 01:00:02    |null                            
> |1970-01-01 01:00:11|1        |  ← has no metadata ✔
> |2        |1970-01-01 01:00:02    |1970-01-01 01:00:02             
> |1970-01-01 01:00:12|2        |
> |10       |1970-01-01 01:00:20    |1970-01-01 01:00:20             
> |1970-01-01 01:00:30|10       |
> +---------+-----------------------+--------------------------------+-------------------+---------+
> {noformat}
> But when the same processing is run as stream processing the output of 
> {{endedSessionsWithMetadata}} does not contain the entry of session {{1}} 
> that has no metadata:
> {noformat}
> -------------------------------------------
> Batch: 0 ("start event")
> -------------------------------------------
> +-------------------------+---------+-----------------------+--------------------------------+
> |sessionStartsWithMetadata|sessionId|sessionStartTimestamp  
> |sessionOptionalMetadataTimestamp|
> +-------------------------+---------+-----------------------+--------------------------------+
> |sessionStartsWithMetadata|10       |1970-01-01 01:00:20    |1970-01-01 
> 01:00:20             |
> |sessionStartsWithMetadata|2        |1970-01-01 01:00:02    |1970-01-01 
> 01:00:02             |
> |sessionStartsWithMetadata|0        |1970-01-01 01:00:00.001|1970-01-01 
> 01:00:00.001         |
> +-------------------------+---------+-----------------------+--------------------------------+
> -------------------------------------------
> Batch: 0 ("end event")
> -------------------------------------------
> +-------------------------+---------+-----------------------+--------------------------------+-------------------+---------+
> |endedSessionsWithMetadata|sessionId|sessionStartTimestamp  
> |sessionOptionalMetadataTimestamp|sessionEndTimestamp|sessionId|
> +-------------------------+---------+-----------------------+--------------------------------+-------------------+---------+
> |endedSessionsWithMetadata|10       |1970-01-01 01:00:20    |1970-01-01 
> 01:00:20             |1970-01-01 01:00:30|10       |
> |endedSessionsWithMetadata|2        |1970-01-01 01:00:02    |1970-01-01 
> 01:00:02             |1970-01-01 01:00:12|2        |
> |endedSessionsWithMetadata|0        |1970-01-01 01:00:00.001|1970-01-01 
> 01:00:00.001         |1970-01-01 01:00:10|0        |
> +-------------------------+---------+-----------------------+--------------------------------+-------------------+---------+
> -------------------------------------------
> Batch: 1 ("start event")
> -------------------------------------------
> +-------------------------+---------+---------------------+--------------------------------+
> |sessionStartsWithMetadata|sessionId|sessionStartTimestamp|sessionOptionalMetadataTimestamp|
> +-------------------------+---------+---------------------+--------------------------------+
> |sessionStartsWithMetadata|1        |1970-01-01 01:00:02  |null               
>              | ← has no metadata ✔
> +-------------------------+---------+---------------------+--------------------------------+
> -------------------------------------------
> Batch: 1 ("end event")
> -------------------------------------------
> +-------------------------+---------+---------------------+--------------------------------+-------------------+---------+
> |endedSessionsWithMetadata|sessionId|sessionStartTimestamp|sessionOptionalMetadataTimestamp|sessionEndTimestamp|sessionId|
> +-------------------------+---------+---------------------+--------------------------------+-------------------+---------+
> +-------------------------+---------+---------------------+--------------------------------+-------------------+---------+
>   ↳ ✘ here I would have expected a line with sessionId=1, that has "start" 
> and "end" information, but no "metadata" ✘
> {noformat}
> In a response it was suggested the issue looks related to [~kabhwan]'s 
> [mailing list 
> post|http://apache-spark-developers-list.1001551.n3.nabble.com/correctness-issue-on-chained-streaming-streaming-join-td27358.html],
>  but since I couldn't find a ticket here tracking the above mentioned issue, 
> I'm creating this one.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to