[ https://issues.apache.org/jira/browse/SPARK-26154?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Haripriya updated SPARK-26154: ------------------------------ Description: Stream-stream joins using left outer join gives inconsistent output The data processed once, is being processed again and gives null value. In Batch 3, the input data "3" is processed. But again in batch 6, null value is provided for same data {code:java} scala> import org.apache.spark.sql.functions.{col, expr} import org.apache.spark.sql.functions.{col, expr} scala> import org.apache.spark.sql.streaming.Trigger import org.apache.spark.sql.streaming.Trigger scala> val lines_stream1 = spark.readStream. | format("kafka"). | option("kafka.bootstrap.servers", "ip:9092"). | option("subscribe", "topic1"). | option("includeTimestamp", true). | load(). | selectExpr("CAST (value AS String)","CAST(timestamp AS TIMESTAMP)").as[(String,Timestamp)]. | select(col("value") as("data"),col("timestamp") as("recordTime")). | select("data","recordTime"). | withWatermark("recordTime", "5 seconds ") lines_stream1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [data: string, recordTime: timestamp] scala> val lines_stream2 = spark.readStream. | format("kafka"). | option("kafka.bootstrap.servers", "ip:9092"). | option("subscribe", "topic2"). | option("includeTimestamp", value = true). | load(). | selectExpr("CAST (value AS String)","CAST(timestamp AS TIMESTAMP)").as[(String,Timestamp)]. | select(col("value") as("data1"),col("timestamp") as("recordTime1")). | select("data1","recordTime1"). | withWatermark("recordTime1", "10 seconds ") lines_stream2: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [data1: string, recordTime1: timestamp] scala> val query = lines_stream1.join(lines_stream2, expr ( | """ | | data == data1 and | | recordTime1 >= recordTime and | | recordTime1 <= recordTime + interval 5 seconds | """.stripMargin),"left"). | writeStream. | option("truncate","false"). | outputMode("append"). | format("console").option("checkpointLocation", "/tmp/leftouter/"). | trigger(Trigger.ProcessingTime ("5 seconds")). | start() query: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@1a48f55b {code} Step2 : Start producing data kafka-console-producer.sh --broker-list ip:9092 --topic topic1 >1 >2 >3 >4 >5 >aa >bb >cc kafka-console-producer.sh --broker-list ip:9092 --topic topic2 >2 >2 >3 >4 >5 >aa >cc >ee >ee Output obtained: Batch: 0 ------------------------------------------- +-----+---------++----------------+ |data|recordTime|data1|recordTime1| +-----+---------++----------------+ +-----+---------++----------------+ ------------------------------------------- Batch: 1 ------------------------------------------- +-----+---------++----------------+ |data|recordTime|data1|recordTime1| +-----+---------++----------------+ +-----+---------++----------------+ ------------------------------------------- Batch: 2 ------------------------------------------- +-----+----------------------++----------------------------+ |data|recordTime|data1|recordTime1| +-----+----------------------++----------------------------+ |3|2018-11-22 20:09:35.053|3|2018-11-22 20:09:36.506| |2|2018-11-22 20:09:31.613|2|2018-11-22 20:09:33.116| +-----+----------------------++----------------------------+ ------------------------------------------- Batch: 3 ------------------------------------------- +-----+----------------------++----------------------------+ |data|recordTime|data1|recordTime1| +-----+----------------------++----------------------------+ |4|2018-11-22 20:09:38.654|4|2018-11-22 20:09:39.818| +-----+----------------------++----------------------------+ ------------------------------------------- Batch: 4 ------------------------------------------- +-----+----------------------++----------------------------+ |data|recordTime|data1|recordTime1| +-----+----------------------++----------------------------+ |5|2018-11-22 20:09:44.809|5|2018-11-22 20:09:47.452| |1|2018-11-22 20:09:22.662|null|null| +-----+----------------------++----------------------------+ ------------------------------------------- Batch: 5 ------------------------------------------- +-----+----------------------++----------------------------+ |data|recordTime|data1|recordTime1| +-----+----------------------++----------------------------+ |cc|2018-11-22 20:10:06.654|cc|2018-11-22 20:10:08.701| |aa|2018-11-22 20:10:01.536|aa|2018-11-22 20:10:03.259| +-----+----------------------++----------------------------+ ------------------------------------------- Batch: 6 ------------------------------------------- +-----+----------------------++----------------+ |data|recordTime|data1|recordTime1| +-----+----------------------++----------------+ |3|2018-11-22 20:09:35.053|null|null| +-----+----------------------++----------------+ was: Stream-stream joins using left outer join gives inconsistent output The data processed once, is being processed again and gives null value. In Batch 2, the input data "abc" is processed. But again in batch 5, null value is provided for same data Steps: In spark-shell scala> import org.apache.spark.sql.functions.\{col, expr} import org.apache.spark.sql.functions.\{col, expr} scala> import org.apache.spark.sql.streaming.Trigger import org.apache.spark.sql.streaming.Trigger scala> val lines_stream1 = spark.readStream. | format("kafka"). | option("kafka.bootstrap.servers", "ip:9092"). | option("subscribe", "topic1"). | option("includeTimestamp", true). | load(). | selectExpr("CAST (value AS String)","CAST(timestamp AS TIMESTAMP)").as[(String,Timestamp)]. | select(col("value") as("data"),col("timestamp") as("recordTime")). | select("data","recordTime"). | withWatermark("recordTime", "5 seconds ") lines_stream1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [data: string, recordTime: timestamp] scala> val lines_stream2 = spark.readStream. | format("kafka"). | option("kafka.bootstrap.servers", "ip:9092"). | option("subscribe", "topic2"). | option("includeTimestamp", value = true). | load(). | selectExpr("CAST (value AS String)","CAST(timestamp AS TIMESTAMP)").as[(String,Timestamp)]. | select(col("value") as("data1"),col("timestamp") as("recordTime1")). | select("data1","recordTime1"). | withWatermark("recordTime1", "10 seconds ") lines_stream2: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [data1: string, recordTime1: timestamp] scala> val query = lines_stream1.join(lines_stream2, expr ( | """ | | data == data1 and | | recordTime1 >= recordTime and | | recordTime1 <= recordTime + interval 5 seconds | """.stripMargin),"left"). | writeStream. | option("truncate","false"). | outputMode("append"). | format("console").option("checkpointLocation", "/tmp/leftouter/"). | trigger(Trigger.ProcessingTime ("5 seconds")). | start() query: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@1a48f55b Step2 : Start producing data bin/kafka-console-producer.sh --broker-list ip:9092 --topic topic1 >abc >def >ghi >123 >123 >234 >345 /kafka-console-producer.sh --broker-list ip:9092 --topic topic2 >abc >ghi >123 >345 >234 >678 Output obtained: ------------------------------------------- Batch: 0 ------------------------------------------- +----+----------+-----+-----------+ |data|recordTime|data1|recordTime1| +----+----------+-----+-----------+ +----+----------+-----+-----------+ ------------------------------------------- Batch: 1 ------------------------------------------- +----+----------+-----+-----------+ |data|recordTime|data1|recordTime1| +----+----------+-----+-----------+ +----+----------+-----+-----------+ ------------------------------------------- Batch: 2 ------------------------------------------- +----+-----------------------+-----+-----------------------+ |data|recordTime |data1|recordTime1 | +----+-----------------------+-----+-----------------------+ |ghi |2018-11-02 10:52:26.072|ghi |2018-11-02 10:52:28.309| |abc |2018-11-02 10:52:18.627|abc |2018-11-02 10:52:22.249| +----+-----------------------+-----+-----------------------+ Batch: 3 ------------------------------------------- +----+-----------------------+-----+-----------------------+ |data|recordTime |data1|recordTime1 | +----+-----------------------+-----+-----------------------+ |123 |2018-11-02 10:52:31.062|123 |2018-11-02 10:52:33.094| +----+-----------------------+-----+-----------------------+ ------------------------------------------- Batch: 4 ------------------------------------------- +----+-----------------------+-----+-----------------------+ |data|recordTime |data1|recordTime1 | +----+-----------------------+-----+-----------------------+ |345 |2018-11-02 10:52:41.252|345 |2018-11-02 10:52:44.178| +----+-----------------------+-----+-----------------------+ ------------------------------------------- Batch: 5 ------------------------------------------- +----+-----------------------+-----+-----------------------+ |data|recordTime |data1|recordTime1 | +----+-----------------------+-----+-----------------------+ |678 |2018-11-02 10:53:04.116|678 |2018-11-02 10:53:06.275| |abc |2018-11-02 10:52:18.627|null |null | |def |2018-11-02 10:52:24.296|null |null | +----+-----------------------+-----+-----------------------+ > Stream-stream joins - left outer join gives inconistent output > -------------------------------------------------------------- > > Key: SPARK-26154 > URL: https://issues.apache.org/jira/browse/SPARK-26154 > Project: Spark > Issue Type: Bug > Components: Structured Streaming > Affects Versions: 2.3.2 > Environment: Spark version - Spark 2.3.2 > OS- Suse 11 > Reporter: Haripriya > Priority: Major > > Stream-stream joins using left outer join gives inconsistent output > The data processed once, is being processed again and gives null value. In > Batch 3, the input data "3" is processed. But again in batch 6, null value > is provided for same data > {code:java} > scala> import org.apache.spark.sql.functions.{col, expr} > import org.apache.spark.sql.functions.{col, expr} > scala> import org.apache.spark.sql.streaming.Trigger > import org.apache.spark.sql.streaming.Trigger > scala> val lines_stream1 = spark.readStream. > | format("kafka"). > | option("kafka.bootstrap.servers", "ip:9092"). > | option("subscribe", "topic1"). > | option("includeTimestamp", true). > | load(). > | selectExpr("CAST (value AS String)","CAST(timestamp AS > TIMESTAMP)").as[(String,Timestamp)]. > | select(col("value") as("data"),col("timestamp") > as("recordTime")). > | select("data","recordTime"). > | withWatermark("recordTime", "5 seconds ") > lines_stream1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = > [data: string, recordTime: timestamp] > scala> val lines_stream2 = spark.readStream. > | format("kafka"). > | option("kafka.bootstrap.servers", "ip:9092"). > | option("subscribe", "topic2"). > | option("includeTimestamp", value = true). > | load(). > | selectExpr("CAST (value AS String)","CAST(timestamp AS > TIMESTAMP)").as[(String,Timestamp)]. > | select(col("value") as("data1"),col("timestamp") > as("recordTime1")). > | select("data1","recordTime1"). > | withWatermark("recordTime1", "10 seconds ") > lines_stream2: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = > [data1: string, recordTime1: timestamp] > scala> val query = lines_stream1.join(lines_stream2, expr ( > | """ > | | data == data1 and > | | recordTime1 >= recordTime and > | | recordTime1 <= recordTime + interval 5 seconds > | """.stripMargin),"left"). > | writeStream. > | option("truncate","false"). > | outputMode("append"). > | format("console").option("checkpointLocation", > "/tmp/leftouter/"). > | trigger(Trigger.ProcessingTime ("5 seconds")). > | start() > query: org.apache.spark.sql.streaming.StreamingQuery = > org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@1a48f55b > {code} > Step2 : Start producing data > kafka-console-producer.sh --broker-list ip:9092 --topic topic1 > >1 > >2 > >3 > >4 > >5 > >aa > >bb > >cc > kafka-console-producer.sh --broker-list ip:9092 --topic topic2 > >2 > >2 > >3 > >4 > >5 > >aa > >cc > >ee > >ee > > Output obtained: > Batch: 0 > ------------------------------------------- > +-----+---------++----------------+ > |data|recordTime|data1|recordTime1| > +-----+---------++----------------+ > +-----+---------++----------------+ > ------------------------------------------- > Batch: 1 > ------------------------------------------- > +-----+---------++----------------+ > |data|recordTime|data1|recordTime1| > +-----+---------++----------------+ > +-----+---------++----------------+ > ------------------------------------------- > Batch: 2 > ------------------------------------------- > +-----+----------------------++----------------------------+ > |data|recordTime|data1|recordTime1| > +-----+----------------------++----------------------------+ > |3|2018-11-22 20:09:35.053|3|2018-11-22 20:09:36.506| > |2|2018-11-22 20:09:31.613|2|2018-11-22 20:09:33.116| > +-----+----------------------++----------------------------+ > ------------------------------------------- > Batch: 3 > ------------------------------------------- > +-----+----------------------++----------------------------+ > |data|recordTime|data1|recordTime1| > +-----+----------------------++----------------------------+ > |4|2018-11-22 20:09:38.654|4|2018-11-22 20:09:39.818| > +-----+----------------------++----------------------------+ > ------------------------------------------- > Batch: 4 > ------------------------------------------- > +-----+----------------------++----------------------------+ > |data|recordTime|data1|recordTime1| > +-----+----------------------++----------------------------+ > |5|2018-11-22 20:09:44.809|5|2018-11-22 20:09:47.452| > |1|2018-11-22 20:09:22.662|null|null| > +-----+----------------------++----------------------------+ > ------------------------------------------- > Batch: 5 > ------------------------------------------- > +-----+----------------------++----------------------------+ > |data|recordTime|data1|recordTime1| > +-----+----------------------++----------------------------+ > |cc|2018-11-22 20:10:06.654|cc|2018-11-22 20:10:08.701| > |aa|2018-11-22 20:10:01.536|aa|2018-11-22 20:10:03.259| > +-----+----------------------++----------------------------+ > ------------------------------------------- > Batch: 6 > ------------------------------------------- > +-----+----------------------++----------------+ > |data|recordTime|data1|recordTime1| > +-----+----------------------++----------------+ > |3|2018-11-22 20:09:35.053|null|null| > +-----+----------------------++----------------+ -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org