[ https://issues.apache.org/jira/browse/SPARK-26154?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Haripriya updated SPARK-26154: ------------------------------ Summary: Stream-stream joins - left outer join gives inconsistent output (was: Stream-stream joins - left outer join gives inconistent output) > Stream-stream joins - left outer join gives inconsistent output > --------------------------------------------------------------- > > Key: SPARK-26154 > URL: https://issues.apache.org/jira/browse/SPARK-26154 > Project: Spark > Issue Type: Bug > Components: Structured Streaming > Affects Versions: 2.3.2 > Environment: Spark version - Spark 2.3.2 > OS- Suse 11 > Reporter: Haripriya > Priority: Major > > Stream-stream joins using left outer join gives inconsistent output > The data processed once, is being processed again and gives null value. In > Batch 3, the input data "3" is processed. But again in batch 6, null value > is provided for same data > {code:java} > scala> import org.apache.spark.sql.functions.{col, expr} > import org.apache.spark.sql.functions.{col, expr} > scala> import org.apache.spark.sql.streaming.Trigger > import org.apache.spark.sql.streaming.Trigger > scala> val lines_stream1 = spark.readStream. > | format("kafka"). > | option("kafka.bootstrap.servers", "ip:9092"). > | option("subscribe", "topic1"). > | option("includeTimestamp", true). > | load(). > | selectExpr("CAST (value AS String)","CAST(timestamp AS > TIMESTAMP)").as[(String,Timestamp)]. > | select(col("value") as("data"),col("timestamp") > as("recordTime")). > | select("data","recordTime"). > | withWatermark("recordTime", "5 seconds ") > lines_stream1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = > [data: string, recordTime: timestamp] > scala> val lines_stream2 = spark.readStream. > | format("kafka"). > | option("kafka.bootstrap.servers", "ip:9092"). > | option("subscribe", "topic2"). > | option("includeTimestamp", value = true). > | load(). > | selectExpr("CAST (value AS String)","CAST(timestamp AS > TIMESTAMP)").as[(String,Timestamp)]. > | select(col("value") as("data1"),col("timestamp") > as("recordTime1")). > | select("data1","recordTime1"). > | withWatermark("recordTime1", "10 seconds ") > lines_stream2: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = > [data1: string, recordTime1: timestamp] > scala> val query = lines_stream1.join(lines_stream2, expr ( > | """ > | | data == data1 and > | | recordTime1 >= recordTime and > | | recordTime1 <= recordTime + interval 5 seconds > | """.stripMargin),"left"). > | writeStream. > | option("truncate","false"). > | outputMode("append"). > | format("console").option("checkpointLocation", > "/tmp/leftouter/"). > | trigger(Trigger.ProcessingTime ("5 seconds")). > | start() > query: org.apache.spark.sql.streaming.StreamingQuery = > org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@1a48f55b > {code} > Step2 : Start producing data > kafka-console-producer.sh --broker-list ip:9092 --topic topic1 > >1 > >2 > >3 > >4 > >5 > >aa > >bb > >cc > kafka-console-producer.sh --broker-list ip:9092 --topic topic2 > >2 > >2 > >3 > >4 > >5 > >aa > >cc > >ee > >ee > > Output obtained: > Batch: 0 > ------------------------------------------- > +-----+---------++----------------+ > |data|recordTime|data1|recordTime1| > +-----+---------++----------------+ > +-----+---------++----------------+ > ------------------------------------------- > Batch: 1 > ------------------------------------------- > +-----+---------++----------------+ > |data|recordTime|data1|recordTime1| > +-----+---------++----------------+ > +-----+---------++----------------+ > ------------------------------------------- > Batch: 2 > ------------------------------------------- > +-----+----------------------++----------------------------+ > |data|recordTime|data1|recordTime1| > +-----+----------------------++----------------------------+ > |3|2018-11-22 20:09:35.053|3|2018-11-22 20:09:36.506| > |2|2018-11-22 20:09:31.613|2|2018-11-22 20:09:33.116| > +-----+----------------------++----------------------------+ > ------------------------------------------- > Batch: 3 > ------------------------------------------- > +-----+----------------------++----------------------------+ > |data|recordTime|data1|recordTime1| > +-----+----------------------++----------------------------+ > |4|2018-11-22 20:09:38.654|4|2018-11-22 20:09:39.818| > +-----+----------------------++----------------------------+ > ------------------------------------------- > Batch: 4 > ------------------------------------------- > +-----+----------------------++----------------------------+ > |data|recordTime|data1|recordTime1| > +-----+----------------------++----------------------------+ > |5|2018-11-22 20:09:44.809|5|2018-11-22 20:09:47.452| > |1|2018-11-22 20:09:22.662|null|null| > +-----+----------------------++----------------------------+ > ------------------------------------------- > Batch: 5 > ------------------------------------------- > +-----+----------------------++----------------------------+ > |data|recordTime|data1|recordTime1| > +-----+----------------------++----------------------------+ > |cc|2018-11-22 20:10:06.654|cc|2018-11-22 20:10:08.701| > |aa|2018-11-22 20:10:01.536|aa|2018-11-22 20:10:03.259| > +-----+----------------------++----------------------------+ > ------------------------------------------- > Batch: 6 > ------------------------------------------- > +-----+----------------------++----------------+ > |data|recordTime|data1|recordTime1| > +-----+----------------------++----------------+ > |3|2018-11-22 20:09:35.053|null|null| > +-----+----------------------++----------------+ -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org