[ https://issues.apache.org/jira/browse/SPARK-25834?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Sachin Ramachandra Setty updated SPARK-25834: --------------------------------------------- Description: Execute the below program and can see there is no AnalysisException thrown import java.sql.Timestamp import org.apache.spark.sql.functions.\{col, expr} import org.apache.spark.sql.streaming.Trigger val lines_stream1 = spark.readStream. format("kafka"). option("kafka.bootstrap.servers", "10.18.99.58:21005,10.18.99.55:21005"). option("subscribe", "test11"). option("includeTimestamp", true). load(). selectExpr("CAST (value AS String)","CAST(timestamp AS TIMESTAMP)").as[(String,Timestamp)]. select(col("value") as("data"),col("timestamp") as("recordTime")). select("data","recordTime"). withWatermark("recordTime", "20 seconds ") val lines_stream2 = spark.readStream. format("kafka"). option("kafka.bootstrap.servers", "10.18.99.58:21005,10.18.99.55:21005"). option("subscribe", "test22"). option("includeTimestamp", value = true). load(). selectExpr("CAST (value AS String)","CAST(timestamp AS TIMESTAMP)").as[(String,Timestamp)]. select(col("value") as("data1"),col("timestamp") as("recordTime1")). select("data1","recordTime1"). withWatermark("recordTime1", "20 seconds ") val query = lines_stream1.join(lines_stream2, expr ( """ | data == data1 and | recordTime1 >= recordTime and | recordTime1 <= recordTime + interval 20 seconds """.stripMargin),"right"). writeStream. option("truncate","false"). outputMode("update"). format("console"). trigger(Trigger.ProcessingTime ("2 second")). start() query.awaitTermination() As per the document [https://spark.apache.org/docs/2.3.2/structured-streaming-programming-guide.html#stream-stream-joins] joins are only supported in append mode *As of Spark 2.3, you can use joins only when the query is in Append output mode. Other output modes are not yet supported.* Inner join is working as per spark documentation but it is failed for outer joins was: Execute the below program and can see there is no AnalysisException thrown import java.sql.Timestamp import org.apache.spark.sql.functions.\{col, expr} import org.apache.spark.sql.streaming.Trigger val lines_stream1 = spark.readStream. format("kafka"). option("kafka.bootstrap.servers", "10.18.99.58:21005,10.18.99.55:21005"). option("subscribe", "test11"). option("includeTimestamp", true). load(). selectExpr("CAST (value AS String)","CAST(timestamp AS TIMESTAMP)").as[(String,Timestamp)]. select(col("value") as("data"),col("timestamp") as("recordTime")). select("data","recordTime"). withWatermark("recordTime", "20 seconds ") val lines_stream2 = spark.readStream. format("kafka"). option("kafka.bootstrap.servers", "10.18.99.58:21005,10.18.99.55:21005"). option("subscribe", "test22"). option("includeTimestamp", value = true). load(). selectExpr("CAST (value AS String)","CAST(timestamp AS TIMESTAMP)").as[(String,Timestamp)]. select(col("value") as("data1"),col("timestamp") as("recordTime1")). select("data1","recordTime1"). withWatermark("recordTime1", "20 seconds ") val query = lines_stream1.join(lines_stream2, expr ( """ | data == data1 and | recordTime1 >= recordTime and | recordTime1 <= recordTime + interval 20 seconds """.stripMargin),"*left*"). writeStream. option("truncate","false"). outputMode("update"). format("console"). trigger(Trigger.ProcessingTime ("2 second")). start() query.awaitTermination() As per the document https://spark.apache.org/docs/2.3.2/structured-streaming-programming-guide.html#stream-stream-joins joins are only supported in append mode *As of Spark 2.3, you can use joins only when the query is in Append output mode. Other output modes are not yet supported.* Inner join is working as per spark documentation but it is failed for outer joins > stream stream Outer join with update mode is not throwing exception > ------------------------------------------------------------------- > > Key: SPARK-25834 > URL: https://issues.apache.org/jira/browse/SPARK-25834 > Project: Spark > Issue Type: Bug > Components: Structured Streaming > Affects Versions: 2.3.1, 2.3.2 > Reporter: Sachin Ramachandra Setty > Priority: Minor > > Execute the below program and can see there is no AnalysisException thrown > import java.sql.Timestamp > import org.apache.spark.sql.functions.\{col, expr} > import org.apache.spark.sql.streaming.Trigger > val lines_stream1 = spark.readStream. > format("kafka"). > option("kafka.bootstrap.servers", "10.18.99.58:21005,10.18.99.55:21005"). > option("subscribe", "test11"). > option("includeTimestamp", true). > load(). > selectExpr("CAST (value AS String)","CAST(timestamp AS > TIMESTAMP)").as[(String,Timestamp)]. > select(col("value") as("data"),col("timestamp") as("recordTime")). > select("data","recordTime"). > withWatermark("recordTime", "20 seconds ") > val lines_stream2 = spark.readStream. > format("kafka"). > option("kafka.bootstrap.servers", "10.18.99.58:21005,10.18.99.55:21005"). > option("subscribe", "test22"). > option("includeTimestamp", value = true). > load(). > selectExpr("CAST (value AS String)","CAST(timestamp AS > TIMESTAMP)").as[(String,Timestamp)]. > select(col("value") as("data1"),col("timestamp") as("recordTime1")). > select("data1","recordTime1"). > withWatermark("recordTime1", "20 seconds ") > > val query = lines_stream1.join(lines_stream2, expr ( > """ > | data == data1 and > | recordTime1 >= recordTime and > | recordTime1 <= recordTime + interval 20 seconds > """.stripMargin),"right"). > writeStream. > option("truncate","false"). > outputMode("update"). > format("console"). > trigger(Trigger.ProcessingTime ("2 second")). > start() > > query.awaitTermination() > As per the document > [https://spark.apache.org/docs/2.3.2/structured-streaming-programming-guide.html#stream-stream-joins] > > joins are only supported in append mode > *As of Spark 2.3, you can use joins only when the query is in Append output > mode. Other output modes are not yet supported.* > Inner join is working as per spark documentation but it is failed for outer > joins -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org