[jira] [Updated] (SPARK-26154) Stream-stream joins - left outer join gives inconsistent output

2020-03-02 Thread Dongjoon Hyun (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-26154?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun updated SPARK-26154:
--
Affects Version/s: 2.3.0

> Stream-stream joins - left outer join gives inconsistent output
> ---
>
> Key: SPARK-26154
> URL: https://issues.apache.org/jira/browse/SPARK-26154
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.3.0, 2.3.2, 3.0.0
> Environment: Spark version - Spark 2.3.2
> OS- Suse 11
>Reporter: Haripriya
>Assignee: Jungtaek Lim
>Priority: Blocker
>  Labels: correctness
> Fix For: 3.0.0
>
>
> Stream-stream joins using left outer join gives inconsistent  output 
> The data processed once, is being processed again and gives null value. In 
> Batch 2, the input data  "3" is processed. But again in batch 6, null value 
> is provided for same data
> Steps
> In spark-shell
> {code:java}
> scala> import org.apache.spark.sql.functions.{col, expr}
> import org.apache.spark.sql.functions.{col, expr}
> scala> import org.apache.spark.sql.streaming.Trigger
> import org.apache.spark.sql.streaming.Trigger
> scala> val lines_stream1 = spark.readStream.
>  |   format("kafka").
>  |   option("kafka.bootstrap.servers", "ip:9092").
>  |   option("subscribe", "topic1").
>  |   option("includeTimestamp", true).
>  |   load().
>  |   selectExpr("CAST (value AS String)","CAST(timestamp AS 
> TIMESTAMP)").as[(String,Timestamp)].
>  |   select(col("value") as("data"),col("timestamp") 
> as("recordTime")).
>  |   select("data","recordTime").
>  |   withWatermark("recordTime", "5 seconds ")
> lines_stream1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = 
> [data: string, recordTime: timestamp]
> scala> val lines_stream2 = spark.readStream.
>  |   format("kafka").
>  |   option("kafka.bootstrap.servers", "ip:9092").
>  |   option("subscribe", "topic2").
>  |   option("includeTimestamp", value = true).
>  |   load().
>  |   selectExpr("CAST (value AS String)","CAST(timestamp AS 
> TIMESTAMP)").as[(String,Timestamp)].
>  |   select(col("value") as("data1"),col("timestamp") 
> as("recordTime1")).
>  |   select("data1","recordTime1").
>  |   withWatermark("recordTime1", "10 seconds ")
> lines_stream2: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = 
> [data1: string, recordTime1: timestamp]
> scala> val query = lines_stream1.join(lines_stream2, expr (
>  |   """
>  | | data == data1 and
>  | | recordTime1 >= recordTime and
>  | | recordTime1 <= recordTime + interval 5 seconds
>  |   """.stripMargin),"left").
>  |   writeStream.
>  |   option("truncate","false").
>  |   outputMode("append").
>  |   format("console").option("checkpointLocation", 
> "/tmp/leftouter/").
>  |   trigger(Trigger.ProcessingTime ("5 seconds")).
>  |   start()
> query: org.apache.spark.sql.streaming.StreamingQuery = 
> org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@1a48f55b
> {code}
> Step2 : Start producing data
> kafka-console-producer.sh --broker-list ip:9092 --topic topic1
>  >1
>  >2
>  >3
>  >4
>  >5
>  >aa
>  >bb
>  >cc
> kafka-console-producer.sh --broker-list ip:9092 --topic topic2
>  >2
>  >2
>  >3
>  >4
>  >5
>  >aa
>  >cc
>  >ee
>  >ee
>  
> Output obtained:
> {code:java}
> Batch: 0
> ---
> ++--+-+---+
> |data|recordTime|data1|recordTime1|
> ++--+-+---+
> ++--+-+---+
> ---
> Batch: 1
> ---
> ++--+-+---+
> |data|recordTime|data1|recordTime1|
> ++--+-+---+
> ++--+-+---+
> ---
> Batch: 2
> ---
> ++---+-+---+
> |data|recordTime |data1|recordTime1|
> ++---+-+---+
> |3   |2018-11-22 20:09:35.053|3|2018-11-22 20:09:36.506|
> |2   |2018-11-22 20:09:31.613|2|2018-11-22 20:09:33.116|
> ++---+-+---+
> ---
> Batch: 3
> ---
> ++---+-+---+
> |data|recordTime |data1|recordTime1|
> ++---+-+---+
> |4   |2018-11-22 

[jira] [Updated] (SPARK-26154) Stream-stream joins - left outer join gives inconsistent output

2020-01-25 Thread Dongjoon Hyun (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-26154?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun updated SPARK-26154:
--
Target Version/s: 3.0.0

> Stream-stream joins - left outer join gives inconsistent output
> ---
>
> Key: SPARK-26154
> URL: https://issues.apache.org/jira/browse/SPARK-26154
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.3.2, 3.0.0
> Environment: Spark version - Spark 2.3.2
> OS- Suse 11
>Reporter: Haripriya
>Assignee: Jungtaek Lim
>Priority: Blocker
>  Labels: correctness
> Fix For: 3.0.0
>
>
> Stream-stream joins using left outer join gives inconsistent  output 
> The data processed once, is being processed again and gives null value. In 
> Batch 2, the input data  "3" is processed. But again in batch 6, null value 
> is provided for same data
> Steps
> In spark-shell
> {code:java}
> scala> import org.apache.spark.sql.functions.{col, expr}
> import org.apache.spark.sql.functions.{col, expr}
> scala> import org.apache.spark.sql.streaming.Trigger
> import org.apache.spark.sql.streaming.Trigger
> scala> val lines_stream1 = spark.readStream.
>  |   format("kafka").
>  |   option("kafka.bootstrap.servers", "ip:9092").
>  |   option("subscribe", "topic1").
>  |   option("includeTimestamp", true).
>  |   load().
>  |   selectExpr("CAST (value AS String)","CAST(timestamp AS 
> TIMESTAMP)").as[(String,Timestamp)].
>  |   select(col("value") as("data"),col("timestamp") 
> as("recordTime")).
>  |   select("data","recordTime").
>  |   withWatermark("recordTime", "5 seconds ")
> lines_stream1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = 
> [data: string, recordTime: timestamp]
> scala> val lines_stream2 = spark.readStream.
>  |   format("kafka").
>  |   option("kafka.bootstrap.servers", "ip:9092").
>  |   option("subscribe", "topic2").
>  |   option("includeTimestamp", value = true).
>  |   load().
>  |   selectExpr("CAST (value AS String)","CAST(timestamp AS 
> TIMESTAMP)").as[(String,Timestamp)].
>  |   select(col("value") as("data1"),col("timestamp") 
> as("recordTime1")).
>  |   select("data1","recordTime1").
>  |   withWatermark("recordTime1", "10 seconds ")
> lines_stream2: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = 
> [data1: string, recordTime1: timestamp]
> scala> val query = lines_stream1.join(lines_stream2, expr (
>  |   """
>  | | data == data1 and
>  | | recordTime1 >= recordTime and
>  | | recordTime1 <= recordTime + interval 5 seconds
>  |   """.stripMargin),"left").
>  |   writeStream.
>  |   option("truncate","false").
>  |   outputMode("append").
>  |   format("console").option("checkpointLocation", 
> "/tmp/leftouter/").
>  |   trigger(Trigger.ProcessingTime ("5 seconds")).
>  |   start()
> query: org.apache.spark.sql.streaming.StreamingQuery = 
> org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@1a48f55b
> {code}
> Step2 : Start producing data
> kafka-console-producer.sh --broker-list ip:9092 --topic topic1
>  >1
>  >2
>  >3
>  >4
>  >5
>  >aa
>  >bb
>  >cc
> kafka-console-producer.sh --broker-list ip:9092 --topic topic2
>  >2
>  >2
>  >3
>  >4
>  >5
>  >aa
>  >cc
>  >ee
>  >ee
>  
> Output obtained:
> {code:java}
> Batch: 0
> ---
> ++--+-+---+
> |data|recordTime|data1|recordTime1|
> ++--+-+---+
> ++--+-+---+
> ---
> Batch: 1
> ---
> ++--+-+---+
> |data|recordTime|data1|recordTime1|
> ++--+-+---+
> ++--+-+---+
> ---
> Batch: 2
> ---
> ++---+-+---+
> |data|recordTime |data1|recordTime1|
> ++---+-+---+
> |3   |2018-11-22 20:09:35.053|3|2018-11-22 20:09:36.506|
> |2   |2018-11-22 20:09:31.613|2|2018-11-22 20:09:33.116|
> ++---+-+---+
> ---
> Batch: 3
> ---
> ++---+-+---+
> |data|recordTime |data1|recordTime1|
> ++---+-+---+
> |4   |2018-11-22 20:09:38.654|4

[jira] [Updated] (SPARK-26154) Stream-stream joins - left outer join gives inconsistent output

2019-10-16 Thread Thomas Graves (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-26154?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Graves updated SPARK-26154:
--
Affects Version/s: 3.0.0

> Stream-stream joins - left outer join gives inconsistent output
> ---
>
> Key: SPARK-26154
> URL: https://issues.apache.org/jira/browse/SPARK-26154
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.3.2, 3.0.0
> Environment: Spark version - Spark 2.3.2
> OS- Suse 11
>Reporter: Haripriya
>Priority: Blocker
>  Labels: correctness
>
> Stream-stream joins using left outer join gives inconsistent  output 
> The data processed once, is being processed again and gives null value. In 
> Batch 2, the input data  "3" is processed. But again in batch 6, null value 
> is provided for same data
> Steps
> In spark-shell
> {code:java}
> scala> import org.apache.spark.sql.functions.{col, expr}
> import org.apache.spark.sql.functions.{col, expr}
> scala> import org.apache.spark.sql.streaming.Trigger
> import org.apache.spark.sql.streaming.Trigger
> scala> val lines_stream1 = spark.readStream.
>  |   format("kafka").
>  |   option("kafka.bootstrap.servers", "ip:9092").
>  |   option("subscribe", "topic1").
>  |   option("includeTimestamp", true).
>  |   load().
>  |   selectExpr("CAST (value AS String)","CAST(timestamp AS 
> TIMESTAMP)").as[(String,Timestamp)].
>  |   select(col("value") as("data"),col("timestamp") 
> as("recordTime")).
>  |   select("data","recordTime").
>  |   withWatermark("recordTime", "5 seconds ")
> lines_stream1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = 
> [data: string, recordTime: timestamp]
> scala> val lines_stream2 = spark.readStream.
>  |   format("kafka").
>  |   option("kafka.bootstrap.servers", "ip:9092").
>  |   option("subscribe", "topic2").
>  |   option("includeTimestamp", value = true).
>  |   load().
>  |   selectExpr("CAST (value AS String)","CAST(timestamp AS 
> TIMESTAMP)").as[(String,Timestamp)].
>  |   select(col("value") as("data1"),col("timestamp") 
> as("recordTime1")).
>  |   select("data1","recordTime1").
>  |   withWatermark("recordTime1", "10 seconds ")
> lines_stream2: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = 
> [data1: string, recordTime1: timestamp]
> scala> val query = lines_stream1.join(lines_stream2, expr (
>  |   """
>  | | data == data1 and
>  | | recordTime1 >= recordTime and
>  | | recordTime1 <= recordTime + interval 5 seconds
>  |   """.stripMargin),"left").
>  |   writeStream.
>  |   option("truncate","false").
>  |   outputMode("append").
>  |   format("console").option("checkpointLocation", 
> "/tmp/leftouter/").
>  |   trigger(Trigger.ProcessingTime ("5 seconds")).
>  |   start()
> query: org.apache.spark.sql.streaming.StreamingQuery = 
> org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@1a48f55b
> {code}
> Step2 : Start producing data
> kafka-console-producer.sh --broker-list ip:9092 --topic topic1
>  >1
>  >2
>  >3
>  >4
>  >5
>  >aa
>  >bb
>  >cc
> kafka-console-producer.sh --broker-list ip:9092 --topic topic2
>  >2
>  >2
>  >3
>  >4
>  >5
>  >aa
>  >cc
>  >ee
>  >ee
>  
> Output obtained:
> {code:java}
> Batch: 0
> ---
> ++--+-+---+
> |data|recordTime|data1|recordTime1|
> ++--+-+---+
> ++--+-+---+
> ---
> Batch: 1
> ---
> ++--+-+---+
> |data|recordTime|data1|recordTime1|
> ++--+-+---+
> ++--+-+---+
> ---
> Batch: 2
> ---
> ++---+-+---+
> |data|recordTime |data1|recordTime1|
> ++---+-+---+
> |3   |2018-11-22 20:09:35.053|3|2018-11-22 20:09:36.506|
> |2   |2018-11-22 20:09:31.613|2|2018-11-22 20:09:33.116|
> ++---+-+---+
> ---
> Batch: 3
> ---
> ++---+-+---+
> |data|recordTime |data1|recordTime1|
> ++---+-+---+
> |4   |2018-11-22 20:09:38.654|4|2018-11-22 20:09:39.818|
> 

[jira] [Updated] (SPARK-26154) Stream-stream joins - left outer join gives inconsistent output

2019-04-11 Thread Jungtaek Lim (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26154?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim updated SPARK-26154:
-
Priority: Blocker  (was: Critical)

> Stream-stream joins - left outer join gives inconsistent output
> ---
>
> Key: SPARK-26154
> URL: https://issues.apache.org/jira/browse/SPARK-26154
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.3.2
> Environment: Spark version - Spark 2.3.2
> OS- Suse 11
>Reporter: Haripriya
>Priority: Blocker
>  Labels: correctness
>
> Stream-stream joins using left outer join gives inconsistent  output 
> The data processed once, is being processed again and gives null value. In 
> Batch 2, the input data  "3" is processed. But again in batch 6, null value 
> is provided for same data
> Steps
> In spark-shell
> {code:java}
> scala> import org.apache.spark.sql.functions.{col, expr}
> import org.apache.spark.sql.functions.{col, expr}
> scala> import org.apache.spark.sql.streaming.Trigger
> import org.apache.spark.sql.streaming.Trigger
> scala> val lines_stream1 = spark.readStream.
>  |   format("kafka").
>  |   option("kafka.bootstrap.servers", "ip:9092").
>  |   option("subscribe", "topic1").
>  |   option("includeTimestamp", true).
>  |   load().
>  |   selectExpr("CAST (value AS String)","CAST(timestamp AS 
> TIMESTAMP)").as[(String,Timestamp)].
>  |   select(col("value") as("data"),col("timestamp") 
> as("recordTime")).
>  |   select("data","recordTime").
>  |   withWatermark("recordTime", "5 seconds ")
> lines_stream1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = 
> [data: string, recordTime: timestamp]
> scala> val lines_stream2 = spark.readStream.
>  |   format("kafka").
>  |   option("kafka.bootstrap.servers", "ip:9092").
>  |   option("subscribe", "topic2").
>  |   option("includeTimestamp", value = true).
>  |   load().
>  |   selectExpr("CAST (value AS String)","CAST(timestamp AS 
> TIMESTAMP)").as[(String,Timestamp)].
>  |   select(col("value") as("data1"),col("timestamp") 
> as("recordTime1")).
>  |   select("data1","recordTime1").
>  |   withWatermark("recordTime1", "10 seconds ")
> lines_stream2: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = 
> [data1: string, recordTime1: timestamp]
> scala> val query = lines_stream1.join(lines_stream2, expr (
>  |   """
>  | | data == data1 and
>  | | recordTime1 >= recordTime and
>  | | recordTime1 <= recordTime + interval 5 seconds
>  |   """.stripMargin),"left").
>  |   writeStream.
>  |   option("truncate","false").
>  |   outputMode("append").
>  |   format("console").option("checkpointLocation", 
> "/tmp/leftouter/").
>  |   trigger(Trigger.ProcessingTime ("5 seconds")).
>  |   start()
> query: org.apache.spark.sql.streaming.StreamingQuery = 
> org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@1a48f55b
> {code}
> Step2 : Start producing data
> kafka-console-producer.sh --broker-list ip:9092 --topic topic1
>  >1
>  >2
>  >3
>  >4
>  >5
>  >aa
>  >bb
>  >cc
> kafka-console-producer.sh --broker-list ip:9092 --topic topic2
>  >2
>  >2
>  >3
>  >4
>  >5
>  >aa
>  >cc
>  >ee
>  >ee
>  
> Output obtained:
> {code:java}
> Batch: 0
> ---
> ++--+-+---+
> |data|recordTime|data1|recordTime1|
> ++--+-+---+
> ++--+-+---+
> ---
> Batch: 1
> ---
> ++--+-+---+
> |data|recordTime|data1|recordTime1|
> ++--+-+---+
> ++--+-+---+
> ---
> Batch: 2
> ---
> ++---+-+---+
> |data|recordTime |data1|recordTime1|
> ++---+-+---+
> |3   |2018-11-22 20:09:35.053|3|2018-11-22 20:09:36.506|
> |2   |2018-11-22 20:09:31.613|2|2018-11-22 20:09:33.116|
> ++---+-+---+
> ---
> Batch: 3
> ---
> ++---+-+---+
> |data|recordTime |data1|recordTime1|
> ++---+-+---+
> |4   |2018-11-22 20:09:38.654|4|2018-11-22 20:09:39.818|
> 

[jira] [Updated] (SPARK-26154) Stream-stream joins - left outer join gives inconsistent output

2019-01-30 Thread Sean Owen (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26154?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sean Owen updated SPARK-26154:
--
  Labels: correctness  (was: )
Priority: Critical  (was: Major)

[~kabhwan] I think everyone is able to modify the labels and priority (not 
actually by design we just can't restrict it). I made this 'Critical' though 
priorities except 'Blocker' don't mean a lot. And labeled it 'correctness'.

> Stream-stream joins - left outer join gives inconsistent output
> ---
>
> Key: SPARK-26154
> URL: https://issues.apache.org/jira/browse/SPARK-26154
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.3.2
> Environment: Spark version - Spark 2.3.2
> OS- Suse 11
>Reporter: Haripriya
>Priority: Critical
>  Labels: correctness
>
> Stream-stream joins using left outer join gives inconsistent  output 
> The data processed once, is being processed again and gives null value. In 
> Batch 2, the input data  "3" is processed. But again in batch 6, null value 
> is provided for same data
> Steps
> In spark-shell
> {code:java}
> scala> import org.apache.spark.sql.functions.{col, expr}
> import org.apache.spark.sql.functions.{col, expr}
> scala> import org.apache.spark.sql.streaming.Trigger
> import org.apache.spark.sql.streaming.Trigger
> scala> val lines_stream1 = spark.readStream.
>  |   format("kafka").
>  |   option("kafka.bootstrap.servers", "ip:9092").
>  |   option("subscribe", "topic1").
>  |   option("includeTimestamp", true).
>  |   load().
>  |   selectExpr("CAST (value AS String)","CAST(timestamp AS 
> TIMESTAMP)").as[(String,Timestamp)].
>  |   select(col("value") as("data"),col("timestamp") 
> as("recordTime")).
>  |   select("data","recordTime").
>  |   withWatermark("recordTime", "5 seconds ")
> lines_stream1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = 
> [data: string, recordTime: timestamp]
> scala> val lines_stream2 = spark.readStream.
>  |   format("kafka").
>  |   option("kafka.bootstrap.servers", "ip:9092").
>  |   option("subscribe", "topic2").
>  |   option("includeTimestamp", value = true).
>  |   load().
>  |   selectExpr("CAST (value AS String)","CAST(timestamp AS 
> TIMESTAMP)").as[(String,Timestamp)].
>  |   select(col("value") as("data1"),col("timestamp") 
> as("recordTime1")).
>  |   select("data1","recordTime1").
>  |   withWatermark("recordTime1", "10 seconds ")
> lines_stream2: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = 
> [data1: string, recordTime1: timestamp]
> scala> val query = lines_stream1.join(lines_stream2, expr (
>  |   """
>  | | data == data1 and
>  | | recordTime1 >= recordTime and
>  | | recordTime1 <= recordTime + interval 5 seconds
>  |   """.stripMargin),"left").
>  |   writeStream.
>  |   option("truncate","false").
>  |   outputMode("append").
>  |   format("console").option("checkpointLocation", 
> "/tmp/leftouter/").
>  |   trigger(Trigger.ProcessingTime ("5 seconds")).
>  |   start()
> query: org.apache.spark.sql.streaming.StreamingQuery = 
> org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@1a48f55b
> {code}
> Step2 : Start producing data
> kafka-console-producer.sh --broker-list ip:9092 --topic topic1
>  >1
>  >2
>  >3
>  >4
>  >5
>  >aa
>  >bb
>  >cc
> kafka-console-producer.sh --broker-list ip:9092 --topic topic2
>  >2
>  >2
>  >3
>  >4
>  >5
>  >aa
>  >cc
>  >ee
>  >ee
>  
> Output obtained:
> {code:java}
> Batch: 0
> ---
> ++--+-+---+
> |data|recordTime|data1|recordTime1|
> ++--+-+---+
> ++--+-+---+
> ---
> Batch: 1
> ---
> ++--+-+---+
> |data|recordTime|data1|recordTime1|
> ++--+-+---+
> ++--+-+---+
> ---
> Batch: 2
> ---
> ++---+-+---+
> |data|recordTime |data1|recordTime1|
> ++---+-+---+
> |3   |2018-11-22 20:09:35.053|3|2018-11-22 20:09:36.506|
> |2   |2018-11-22 20:09:31.613|2|2018-11-22 20:09:33.116|
> ++---+-+---+
> ---
> Batch: 3
> ---
> 

[jira] [Updated] (SPARK-26154) Stream-stream joins - left outer join gives inconsistent output

2018-11-22 Thread Haripriya (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26154?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Haripriya updated SPARK-26154:
--
Description: 
Stream-stream joins using left outer join gives inconsistent  output 

The data processed once, is being processed again and gives null value. In 
Batch 2, the input data  "3" is processed. But again in batch 6, null value is 
provided for same data

Steps
In spark-shell
{code:java}
scala> import org.apache.spark.sql.functions.{col, expr}
import org.apache.spark.sql.functions.{col, expr}
scala> import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.streaming.Trigger
scala> val lines_stream1 = spark.readStream.
 |   format("kafka").
 |   option("kafka.bootstrap.servers", "ip:9092").
 |   option("subscribe", "topic1").
 |   option("includeTimestamp", true).
 |   load().
 |   selectExpr("CAST (value AS String)","CAST(timestamp AS 
TIMESTAMP)").as[(String,Timestamp)].
 |   select(col("value") as("data"),col("timestamp") as("recordTime")).
 |   select("data","recordTime").
 |   withWatermark("recordTime", "5 seconds ")
lines_stream1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [data: 
string, recordTime: timestamp]
scala> val lines_stream2 = spark.readStream.
 |   format("kafka").
 |   option("kafka.bootstrap.servers", "ip:9092").
 |   option("subscribe", "topic2").
 |   option("includeTimestamp", value = true).
 |   load().
 |   selectExpr("CAST (value AS String)","CAST(timestamp AS 
TIMESTAMP)").as[(String,Timestamp)].
 |   select(col("value") as("data1"),col("timestamp") 
as("recordTime1")).
 |   select("data1","recordTime1").
 |   withWatermark("recordTime1", "10 seconds ")
lines_stream2: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [data1: 
string, recordTime1: timestamp]
scala> val query = lines_stream1.join(lines_stream2, expr (
 |   """
 | | data == data1 and
 | | recordTime1 >= recordTime and
 | | recordTime1 <= recordTime + interval 5 seconds
 |   """.stripMargin),"left").
 |   writeStream.
 |   option("truncate","false").
 |   outputMode("append").
 |   format("console").option("checkpointLocation", "/tmp/leftouter/").
 |   trigger(Trigger.ProcessingTime ("5 seconds")).
 |   start()
query: org.apache.spark.sql.streaming.StreamingQuery = 
org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@1a48f55b
{code}

Step2 : Start producing data

kafka-console-producer.sh --broker-list ip:9092 --topic topic1
 >1
 >2
 >3
 >4
 >5
 >aa
 >bb
 >cc

kafka-console-producer.sh --broker-list ip:9092 --topic topic2
 >2
 >2
 >3
 >4
 >5
 >aa
 >cc
 >ee
 >ee

 

Output obtained:

{code:java}
Batch: 0
---
++--+-+---+
|data|recordTime|data1|recordTime1|
++--+-+---+
++--+-+---+

---
Batch: 1
---
++--+-+---+
|data|recordTime|data1|recordTime1|
++--+-+---+
++--+-+---+

---
Batch: 2
---
++---+-+---+
|data|recordTime |data1|recordTime1|
++---+-+---+
|3   |2018-11-22 20:09:35.053|3|2018-11-22 20:09:36.506|
|2   |2018-11-22 20:09:31.613|2|2018-11-22 20:09:33.116|
++---+-+---+

---
Batch: 3
---
++---+-+---+
|data|recordTime |data1|recordTime1|
++---+-+---+
|4   |2018-11-22 20:09:38.654|4|2018-11-22 20:09:39.818|
++---+-+---+

---
Batch: 4
---
++---+-+---+
|data|recordTime |data1|recordTime1|
++---+-+---+
|5   |2018-11-22 20:09:44.809|5|2018-11-22 20:09:47.452|
|1   |2018-11-22 20:09:22.662|null |null   |
++---+-+---+

---
Batch: 5
---
++---+-+---+
|data|recordTime |data1|recordTime1|
++---+-+---+
|cc 

[jira] [Updated] (SPARK-26154) Stream-stream joins - left outer join gives inconsistent output

2018-11-22 Thread Haripriya (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26154?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Haripriya updated SPARK-26154:
--
Description: 
Stream-stream joins using left outer join gives inconsistent  output 

The data processed once, is being processed again and gives null value. In 
Batch 2, the input data  "3" is processed. But again in batch 6, null value is 
provided for same data
{code:java}
scala> import org.apache.spark.sql.functions.{col, expr}
import org.apache.spark.sql.functions.{col, expr}
scala> import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.streaming.Trigger
scala> val lines_stream1 = spark.readStream.
 |   format("kafka").
 |   option("kafka.bootstrap.servers", "ip:9092").
 |   option("subscribe", "topic1").
 |   option("includeTimestamp", true).
 |   load().
 |   selectExpr("CAST (value AS String)","CAST(timestamp AS 
TIMESTAMP)").as[(String,Timestamp)].
 |   select(col("value") as("data"),col("timestamp") as("recordTime")).
 |   select("data","recordTime").
 |   withWatermark("recordTime", "5 seconds ")
lines_stream1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [data: 
string, recordTime: timestamp]
scala> val lines_stream2 = spark.readStream.
 |   format("kafka").
 |   option("kafka.bootstrap.servers", "ip:9092").
 |   option("subscribe", "topic2").
 |   option("includeTimestamp", value = true).
 |   load().
 |   selectExpr("CAST (value AS String)","CAST(timestamp AS 
TIMESTAMP)").as[(String,Timestamp)].
 |   select(col("value") as("data1"),col("timestamp") 
as("recordTime1")).
 |   select("data1","recordTime1").
 |   withWatermark("recordTime1", "10 seconds ")
lines_stream2: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [data1: 
string, recordTime1: timestamp]
scala> val query = lines_stream1.join(lines_stream2, expr (
 |   """
 | | data == data1 and
 | | recordTime1 >= recordTime and
 | | recordTime1 <= recordTime + interval 5 seconds
 |   """.stripMargin),"left").
 |   writeStream.
 |   option("truncate","false").
 |   outputMode("append").
 |   format("console").option("checkpointLocation", "/tmp/leftouter/").
 |   trigger(Trigger.ProcessingTime ("5 seconds")).
 |   start()
query: org.apache.spark.sql.streaming.StreamingQuery = 
org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@1a48f55b
{code}
Step2 : Start producing data

kafka-console-producer.sh --broker-list ip:9092 --topic topic1
 >1
 >2
 >3
 >4
 >5
 >aa
 >bb
 >cc

kafka-console-producer.sh --broker-list ip:9092 --topic topic2
 >2
 >2
 >3
 >4
 >5
 >aa
 >cc
 >ee
 >ee

 

Output obtained:
 Batch: 0
 ---
 +-+-+++
|data|recordTime|data1|recordTime1|

+-+-+++
 +-+-+++

---
 Batch: 1
 ---
 +-+-+++
|data|recordTime|data1|recordTime1|

+-+-+++
 +-+-+++

---
 Batch: 2
 ---
 +-+--+++
|data|recordTime|data1|recordTime1|

+-+--+++
|3|2018-11-22 20:09:35.053|3|2018-11-22 20:09:36.506|
|2|2018-11-22 20:09:31.613|2|2018-11-22 20:09:33.116|

+-+--+++

---
 Batch: 3
 ---
 +-+--+++
|data|recordTime|data1|recordTime1|

+-+--+++
|4|2018-11-22 20:09:38.654|4|2018-11-22 20:09:39.818|

+-+--+++

---
 Batch: 4
 ---
 +-+--+++
|data|recordTime|data1|recordTime1|

+-+--+++
|5|2018-11-22 20:09:44.809|5|2018-11-22 20:09:47.452|
|1|2018-11-22 20:09:22.662|null|null|

+-+--+++

---
 Batch: 5
 ---
 +-+--+++
|data|recordTime|data1|recordTime1|

+-+--+++
|cc|2018-11-22 20:10:06.654|cc|2018-11-22 20:10:08.701|
|aa|2018-11-22 20:10:01.536|aa|2018-11-22 20:10:03.259|


[jira] [Updated] (SPARK-26154) Stream-stream joins - left outer join gives inconsistent output

2018-11-22 Thread Haripriya (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26154?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Haripriya updated SPARK-26154:
--
Summary: Stream-stream joins - left outer join gives inconsistent output  
(was: Stream-stream joins - left outer join gives inconistent output)

> Stream-stream joins - left outer join gives inconsistent output
> ---
>
> Key: SPARK-26154
> URL: https://issues.apache.org/jira/browse/SPARK-26154
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.3.2
> Environment: Spark version - Spark 2.3.2
> OS- Suse 11
>Reporter: Haripriya
>Priority: Major
>
> Stream-stream joins using left outer join gives inconsistent  output 
> The data processed once, is being processed again and gives null value. In 
> Batch 3, the input data  "3" is processed. But again in batch 6, null value 
> is provided for same data
> {code:java}
> scala> import org.apache.spark.sql.functions.{col, expr}
> import org.apache.spark.sql.functions.{col, expr}
> scala> import org.apache.spark.sql.streaming.Trigger
> import org.apache.spark.sql.streaming.Trigger
> scala> val lines_stream1 = spark.readStream.
>  |   format("kafka").
>  |   option("kafka.bootstrap.servers", "ip:9092").
>  |   option("subscribe", "topic1").
>  |   option("includeTimestamp", true).
>  |   load().
>  |   selectExpr("CAST (value AS String)","CAST(timestamp AS 
> TIMESTAMP)").as[(String,Timestamp)].
>  |   select(col("value") as("data"),col("timestamp") 
> as("recordTime")).
>  |   select("data","recordTime").
>  |   withWatermark("recordTime", "5 seconds ")
> lines_stream1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = 
> [data: string, recordTime: timestamp]
> scala> val lines_stream2 = spark.readStream.
>  |   format("kafka").
>  |   option("kafka.bootstrap.servers", "ip:9092").
>  |   option("subscribe", "topic2").
>  |   option("includeTimestamp", value = true).
>  |   load().
>  |   selectExpr("CAST (value AS String)","CAST(timestamp AS 
> TIMESTAMP)").as[(String,Timestamp)].
>  |   select(col("value") as("data1"),col("timestamp") 
> as("recordTime1")).
>  |   select("data1","recordTime1").
>  |   withWatermark("recordTime1", "10 seconds ")
> lines_stream2: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = 
> [data1: string, recordTime1: timestamp]
> scala> val query = lines_stream1.join(lines_stream2, expr (
>  |   """
>  | | data == data1 and
>  | | recordTime1 >= recordTime and
>  | | recordTime1 <= recordTime + interval 5 seconds
>  |   """.stripMargin),"left").
>  |   writeStream.
>  |   option("truncate","false").
>  |   outputMode("append").
>  |   format("console").option("checkpointLocation", 
> "/tmp/leftouter/").
>  |   trigger(Trigger.ProcessingTime ("5 seconds")).
>  |   start()
> query: org.apache.spark.sql.streaming.StreamingQuery = 
> org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@1a48f55b
> {code}
> Step2 : Start producing data
> kafka-console-producer.sh --broker-list ip:9092 --topic topic1
>  >1
>  >2
>  >3
>  >4
>  >5
>  >aa
>  >bb
>  >cc
> kafka-console-producer.sh --broker-list ip:9092 --topic topic2
>  >2
>  >2
>  >3
>  >4
>  >5
>  >aa
>  >cc
>  >ee
>  >ee
>  
> Output obtained:
>  Batch: 0
>  ---
>  +-+-+++
> |data|recordTime|data1|recordTime1|
> +-+-+++
>  +-+-+++
> ---
>  Batch: 1
>  ---
>  +-+-+++
> |data|recordTime|data1|recordTime1|
> +-+-+++
>  +-+-+++
> ---
>  Batch: 2
>  ---
>  +-+--+++
> |data|recordTime|data1|recordTime1|
> +-+--+++
> |3|2018-11-22 20:09:35.053|3|2018-11-22 20:09:36.506|
> |2|2018-11-22 20:09:31.613|2|2018-11-22 20:09:33.116|
> +-+--+++
> ---
>  Batch: 3
>  ---
>  +-+--+++
> |data|recordTime|data1|recordTime1|
> +-+--+++
> |4|2018-11-22 20:09:38.654|4|2018-11-22 20:09:39.818|
> +-+--+++
>