Haripriya created SPARK-26154:
---------------------------------

             Summary: Stream-stream joins - left outer join gives inconistent 
output
                 Key: SPARK-26154
                 URL: https://issues.apache.org/jira/browse/SPARK-26154
             Project: Spark
          Issue Type: Bug
          Components: Structured Streaming
    Affects Versions: 2.3.2
         Environment: Spark version - Spark 2.3.2

OS- Suse 11
            Reporter: Haripriya


Stream-stream joins using left outer join gives inconsistent  output 

The data processed once, is being processed again and gives null value. In 
Batch 2, the input data  "abc" is processed. But again in batch 5, null value 
is provided for same data

 

Steps:

In spark-shell
scala> import org.apache.spark.sql.functions.\{col, expr}
import org.apache.spark.sql.functions.\{col, expr}
scala> import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.streaming.Trigger
scala> val lines_stream1 = spark.readStream.
 | format("kafka").
 | option("kafka.bootstrap.servers", "ip:9092").
 | option("subscribe", "topic1").
 | option("includeTimestamp", true).
 | load().
 | selectExpr("CAST (value AS String)","CAST(timestamp AS 
TIMESTAMP)").as[(String,Timestamp)].
 | select(col("value") as("data"),col("timestamp") as("recordTime")).
 | select("data","recordTime").
 | withWatermark("recordTime", "5 seconds ")
lines_stream1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [data: 
string, recordTime: timestamp]
scala> val lines_stream2 = spark.readStream.
 | format("kafka").
 | option("kafka.bootstrap.servers", "ip:9092").
 | option("subscribe", "topic2").
 | option("includeTimestamp", value = true).
 | load().
 | selectExpr("CAST (value AS String)","CAST(timestamp AS 
TIMESTAMP)").as[(String,Timestamp)].
 | select(col("value") as("data1"),col("timestamp") as("recordTime1")).
 | select("data1","recordTime1").
 | withWatermark("recordTime1", "10 seconds ")
lines_stream2: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [data1: 
string, recordTime1: timestamp]
scala> val query = lines_stream1.join(lines_stream2, expr (
 | """
 | | data == data1 and
 | | recordTime1 >= recordTime and
 | | recordTime1 <= recordTime + interval 5 seconds
 | """.stripMargin),"left").
 | writeStream.
 | option("truncate","false").
 | outputMode("append").
 | format("console").option("checkpointLocation", "/tmp/leftouter/").
 | trigger(Trigger.ProcessingTime ("5 seconds")).
 | start()
query: org.apache.spark.sql.streaming.StreamingQuery = 
org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@1a48f55b

Step2 : Start producing data

 bin/kafka-console-producer.sh --broker-list ip:9092 --topic topic1
>abc
>def
>ghi
>123
>123
>234
>345

/kafka-console-producer.sh --broker-list ip:9092 --topic topic2
>abc
>ghi
>123
>345
>234
>678

 

Output obtained:

-------------------------------------------
Batch: 0
-------------------------------------------
+----+----------+-----+-----------+
|data|recordTime|data1|recordTime1|
+----+----------+-----+-----------+
+----+----------+-----+-----------+

-------------------------------------------
Batch: 1
-------------------------------------------
+----+----------+-----+-----------+
|data|recordTime|data1|recordTime1|
+----+----------+-----+-----------+
+----+----------+-----+-----------+

-------------------------------------------
Batch: 2
-------------------------------------------
+----+-----------------------+-----+-----------------------+
|data|recordTime |data1|recordTime1 |
+----+-----------------------+-----+-----------------------+
|ghi |2018-11-02 10:52:26.072|ghi |2018-11-02 10:52:28.309|
|abc |2018-11-02 10:52:18.627|abc |2018-11-02 10:52:22.249|
+----+-----------------------+-----+-----------------------+

Batch: 3
-------------------------------------------
+----+-----------------------+-----+-----------------------+
|data|recordTime |data1|recordTime1 |
+----+-----------------------+-----+-----------------------+
|123 |2018-11-02 10:52:31.062|123 |2018-11-02 10:52:33.094|
+----+-----------------------+-----+-----------------------+

-------------------------------------------
Batch: 4
-------------------------------------------
+----+-----------------------+-----+-----------------------+
|data|recordTime |data1|recordTime1 |
+----+-----------------------+-----+-----------------------+
|345 |2018-11-02 10:52:41.252|345 |2018-11-02 10:52:44.178|
+----+-----------------------+-----+-----------------------+

-------------------------------------------
Batch: 5
-------------------------------------------
+----+-----------------------+-----+-----------------------+
|data|recordTime |data1|recordTime1 |
+----+-----------------------+-----+-----------------------+
|678 |2018-11-02 10:53:04.116|678 |2018-11-02 10:53:06.275|
|abc |2018-11-02 10:52:18.627|null |null |
|def |2018-11-02 10:52:24.296|null |null |
+----+-----------------------+-----+-----------------------+



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to