[ 
https://issues.apache.org/jira/browse/SPARK-25834?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sachin Ramachandra Setty updated SPARK-25834:
---------------------------------------------
    Description: 
Execute the below program and can see there is no AnalysisException thrown

import java.sql.Timestamp
 import org.apache.spark.sql.functions.\{col, expr}
 import org.apache.spark.sql.streaming.Trigger

val lines_stream1 = spark.readStream.
 format("kafka").
 option("kafka.bootstrap.servers", "10.18.99.58:21005,10.18.99.55:21005").
 option("subscribe", "test11").
 option("includeTimestamp", true).
 load().
 selectExpr("CAST (value AS String)","CAST(timestamp AS 
TIMESTAMP)").as[(String,Timestamp)].
 select(col("value") as("data"),col("timestamp") as("recordTime")).
 select("data","recordTime").
 withWatermark("recordTime", "20 seconds ")

val lines_stream2 = spark.readStream.
 format("kafka").
 option("kafka.bootstrap.servers", "10.18.99.58:21005,10.18.99.55:21005").
 option("subscribe", "test22").
 option("includeTimestamp", value = true).
 load().
 selectExpr("CAST (value AS String)","CAST(timestamp AS 
TIMESTAMP)").as[(String,Timestamp)].
 select(col("value") as("data1"),col("timestamp") as("recordTime1")).
 select("data1","recordTime1").
 withWatermark("recordTime1", "20 seconds ")

 
 val query = lines_stream1.join(lines_stream2, expr (
 """
 | data == data1 and
 | recordTime1 >= recordTime and
 | recordTime1 <= recordTime + interval 20 seconds
 """.stripMargin),"right").
 writeStream.
 option("truncate","false").
 outputMode("update").
 format("console").
 trigger(Trigger.ProcessingTime ("2 second")).
 start()
 
query.awaitTermination()

As per the document 
[https://spark.apache.org/docs/2.3.2/structured-streaming-programming-guide.html#stream-stream-joins]
 
 joins are only supported in append mode

*As of Spark 2.3, you can use joins only when the query is in Append output 
mode. Other output modes are not yet supported.*

Inner join is working as per spark documentation but it is failed for outer 
joins

  was:
Execute the below program and can see there is no AnalysisException thrown

import java.sql.Timestamp
 import org.apache.spark.sql.functions.\{col, expr}
 import org.apache.spark.sql.streaming.Trigger
 
 val lines_stream1 = spark.readStream.
 format("kafka").
 option("kafka.bootstrap.servers", "10.18.99.58:21005,10.18.99.55:21005").
 option("subscribe", "test11").
 option("includeTimestamp", true).
 load().
 selectExpr("CAST (value AS String)","CAST(timestamp AS 
TIMESTAMP)").as[(String,Timestamp)].
 select(col("value") as("data"),col("timestamp") as("recordTime")).
 select("data","recordTime").
 withWatermark("recordTime", "20 seconds ")

val lines_stream2 = spark.readStream.
 format("kafka").
 option("kafka.bootstrap.servers", "10.18.99.58:21005,10.18.99.55:21005").
 option("subscribe", "test22").
 option("includeTimestamp", value = true).
 load().
 selectExpr("CAST (value AS String)","CAST(timestamp AS 
TIMESTAMP)").as[(String,Timestamp)].
 select(col("value") as("data1"),col("timestamp") as("recordTime1")).
 select("data1","recordTime1").
 withWatermark("recordTime1", "20 seconds ")


 val query = lines_stream1.join(lines_stream2, expr (
 """
 | data == data1 and
 | recordTime1 >= recordTime and
 | recordTime1 <= recordTime + interval 20 seconds
 """.stripMargin),"*left*").
 writeStream.
 option("truncate","false").
 outputMode("update").
 format("console").
 trigger(Trigger.ProcessingTime ("2 second")).
 start()

query.awaitTermination()

As per the document 
https://spark.apache.org/docs/2.3.2/structured-streaming-programming-guide.html#stream-stream-joins
 
 joins are only supported in append mode

*As of Spark 2.3, you can use joins only when the query is in Append output 
mode. Other output modes are not yet supported.*

Inner join is working as per spark documentation but it is failed for outer 
joins


> stream stream Outer join with update mode is not throwing exception
> -------------------------------------------------------------------
>
>                 Key: SPARK-25834
>                 URL: https://issues.apache.org/jira/browse/SPARK-25834
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.3.1, 2.3.2
>            Reporter: Sachin Ramachandra Setty
>            Priority: Minor
>
> Execute the below program and can see there is no AnalysisException thrown
> import java.sql.Timestamp
>  import org.apache.spark.sql.functions.\{col, expr}
>  import org.apache.spark.sql.streaming.Trigger
> val lines_stream1 = spark.readStream.
>  format("kafka").
>  option("kafka.bootstrap.servers", "10.18.99.58:21005,10.18.99.55:21005").
>  option("subscribe", "test11").
>  option("includeTimestamp", true).
>  load().
>  selectExpr("CAST (value AS String)","CAST(timestamp AS 
> TIMESTAMP)").as[(String,Timestamp)].
>  select(col("value") as("data"),col("timestamp") as("recordTime")).
>  select("data","recordTime").
>  withWatermark("recordTime", "20 seconds ")
> val lines_stream2 = spark.readStream.
>  format("kafka").
>  option("kafka.bootstrap.servers", "10.18.99.58:21005,10.18.99.55:21005").
>  option("subscribe", "test22").
>  option("includeTimestamp", value = true).
>  load().
>  selectExpr("CAST (value AS String)","CAST(timestamp AS 
> TIMESTAMP)").as[(String,Timestamp)].
>  select(col("value") as("data1"),col("timestamp") as("recordTime1")).
>  select("data1","recordTime1").
>  withWatermark("recordTime1", "20 seconds ")
>  
>  val query = lines_stream1.join(lines_stream2, expr (
>  """
>  | data == data1 and
>  | recordTime1 >= recordTime and
>  | recordTime1 <= recordTime + interval 20 seconds
>  """.stripMargin),"right").
>  writeStream.
>  option("truncate","false").
>  outputMode("update").
>  format("console").
>  trigger(Trigger.ProcessingTime ("2 second")).
>  start()
>  
> query.awaitTermination()
> As per the document 
> [https://spark.apache.org/docs/2.3.2/structured-streaming-programming-guide.html#stream-stream-joins]
>  
>  joins are only supported in append mode
> *As of Spark 2.3, you can use joins only when the query is in Append output 
> mode. Other output modes are not yet supported.*
> Inner join is working as per spark documentation but it is failed for outer 
> joins



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to