[ https://issues.apache.org/jira/browse/SPARK-26154?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16756716#comment-16756716 ]
Sean Owen commented on SPARK-26154: ----------------------------------- Sure, it's a judgment call. If you see people contemplating it as blocking a release, I think that change would be fair to make. You can update other JIRA elements if you're pretty sure, given your experience with the project, that it should have a different label or whatever. What we don't want is simply people with no experience in the project marking things Blocker and setting a bunch of unuseful flags like 'Important' or tagging it 'spark'. If you know enough to be thoughtful about it, I bet your edits would be OK. If in doubt ask. > Stream-stream joins - left outer join gives inconsistent output > --------------------------------------------------------------- > > Key: SPARK-26154 > URL: https://issues.apache.org/jira/browse/SPARK-26154 > Project: Spark > Issue Type: Bug > Components: Structured Streaming > Affects Versions: 2.3.2 > Environment: Spark version - Spark 2.3.2 > OS- Suse 11 > Reporter: Haripriya > Priority: Critical > Labels: correctness > > Stream-stream joins using left outer join gives inconsistent output > The data processed once, is being processed again and gives null value. In > Batch 2, the input data "3" is processed. But again in batch 6, null value > is provided for same data > Steps > In spark-shell > {code:java} > scala> import org.apache.spark.sql.functions.{col, expr} > import org.apache.spark.sql.functions.{col, expr} > scala> import org.apache.spark.sql.streaming.Trigger > import org.apache.spark.sql.streaming.Trigger > scala> val lines_stream1 = spark.readStream. > | format("kafka"). > | option("kafka.bootstrap.servers", "ip:9092"). > | option("subscribe", "topic1"). > | option("includeTimestamp", true). > | load(). > | selectExpr("CAST (value AS String)","CAST(timestamp AS > TIMESTAMP)").as[(String,Timestamp)]. > | select(col("value") as("data"),col("timestamp") > as("recordTime")). > | select("data","recordTime"). > | withWatermark("recordTime", "5 seconds ") > lines_stream1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = > [data: string, recordTime: timestamp] > scala> val lines_stream2 = spark.readStream. > | format("kafka"). > | option("kafka.bootstrap.servers", "ip:9092"). > | option("subscribe", "topic2"). > | option("includeTimestamp", value = true). > | load(). > | selectExpr("CAST (value AS String)","CAST(timestamp AS > TIMESTAMP)").as[(String,Timestamp)]. > | select(col("value") as("data1"),col("timestamp") > as("recordTime1")). > | select("data1","recordTime1"). > | withWatermark("recordTime1", "10 seconds ") > lines_stream2: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = > [data1: string, recordTime1: timestamp] > scala> val query = lines_stream1.join(lines_stream2, expr ( > | """ > | | data == data1 and > | | recordTime1 >= recordTime and > | | recordTime1 <= recordTime + interval 5 seconds > | """.stripMargin),"left"). > | writeStream. > | option("truncate","false"). > | outputMode("append"). > | format("console").option("checkpointLocation", > "/tmp/leftouter/"). > | trigger(Trigger.ProcessingTime ("5 seconds")). > | start() > query: org.apache.spark.sql.streaming.StreamingQuery = > org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@1a48f55b > {code} > Step2 : Start producing data > kafka-console-producer.sh --broker-list ip:9092 --topic topic1 > >1 > >2 > >3 > >4 > >5 > >aa > >bb > >cc > kafka-console-producer.sh --broker-list ip:9092 --topic topic2 > >2 > >2 > >3 > >4 > >5 > >aa > >cc > >ee > >ee > > Output obtained: > {code:java} > Batch: 0 > ------------------------------------------- > +----+----------+-----+-----------+ > |data|recordTime|data1|recordTime1| > +----+----------+-----+-----------+ > +----+----------+-----+-----------+ > ------------------------------------------- > Batch: 1 > ------------------------------------------- > +----+----------+-----+-----------+ > |data|recordTime|data1|recordTime1| > +----+----------+-----+-----------+ > +----+----------+-----+-----------+ > ------------------------------------------- > Batch: 2 > ------------------------------------------- > +----+-----------------------+-----+-----------------------+ > |data|recordTime |data1|recordTime1 | > +----+-----------------------+-----+-----------------------+ > |3 |2018-11-22 20:09:35.053|3 |2018-11-22 20:09:36.506| > |2 |2018-11-22 20:09:31.613|2 |2018-11-22 20:09:33.116| > +----+-----------------------+-----+-----------------------+ > ------------------------------------------- > Batch: 3 > ------------------------------------------- > +----+-----------------------+-----+-----------------------+ > |data|recordTime |data1|recordTime1 | > +----+-----------------------+-----+-----------------------+ > |4 |2018-11-22 20:09:38.654|4 |2018-11-22 20:09:39.818| > +----+-----------------------+-----+-----------------------+ > ------------------------------------------- > Batch: 4 > ------------------------------------------- > +----+-----------------------+-----+-----------------------+ > |data|recordTime |data1|recordTime1 | > +----+-----------------------+-----+-----------------------+ > |5 |2018-11-22 20:09:44.809|5 |2018-11-22 20:09:47.452| > |1 |2018-11-22 20:09:22.662|null |null | > +----+-----------------------+-----+-----------------------+ > ------------------------------------------- > Batch: 5 > ------------------------------------------- > +----+-----------------------+-----+-----------------------+ > |data|recordTime |data1|recordTime1 | > +----+-----------------------+-----+-----------------------+ > |cc |2018-11-22 20:10:06.654|cc |2018-11-22 20:10:08.701| > |aa |2018-11-22 20:10:01.536|aa |2018-11-22 20:10:03.259| > +----+-----------------------+-----+-----------------------+ > ------------------------------------------- > Batch: 6 > ------------------------------------------- > +----+-----------------------+-----+-----------+ > |data|recordTime |data1|recordTime1| > +----+-----------------------+-----+-----------+ > |3 |2018-11-22 20:09:35.053|null |null | > +----+-----------------------+-----+-----------+ > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org