[ 
https://issues.apache.org/jira/browse/SPARK-23869?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

bharath kumar avusherla updated SPARK-23869:
--------------------------------------------
    Description: 
Left outer join on two streams not emitting the null outputs. It is just 
waiting for the record to be added to other stream. Used socketstream to test 
this. In our case we want to emit the records with null values which doesn't 
match with id or/and not fall in time range condition

Details of the watermarks and intervals are:

val ds1Map = ds1
 .selectExpr("Id AS ds1_Id", "ds1_timestamp")
 .withWatermark("ds1_timestamp","10 seconds")

val ds2Map = ds2
 .selectExpr("Id AS ds2_Id", "ds2_timestamp")
 .withWatermark("ds2_timestamp", "20 seconds")

val output = ds1Map.join( ds2Map,
 expr(
 """ ds1_Id = ds2_Id AND ds2_timestamp >= ds1_timestamp AND  ds2_timestamp <= 
ds1_timestamp + interval 1 minutes """),
 "leftOuter")

val query = output.select("*")
 .writeStream

.outputMode(OutputMode.Append)
 .format("console")
 .option("checkpointLocation", "./spark-checkpoints/")
 .start()

query.awaitTermination()

Thank you. 

 

  was:
Left outer join on two streams not emitting the null outputs. It is just 
waiting for the record to be added to other stream. Used socketstream to test 
this. In our case we want to emit the records with null values which doesn't 
match with id or/and not fall in time range condition

Details of the watermarks and intervals are:

val ds1Map = ds1
 .selectExpr("Id AS ds1_Id", "ds1_timestamp")
 .withWatermark("ds1_timestamp","10 seconds")

val ds2Map = ds2
 .selectExpr("Id AS ds2_Id", "ds2_timestamp")
 .withWatermark("ds2_timestamp", "20 seconds")

val output = ds1Map.join( ds2Map,
 expr(
 """ ds1_Id = ds2_Id AND ds2_timestamp >= ds1_timestamp AND  ds2_timestamp <= 
ds1_timestamp + interval 1 minutes """),
 "leftOuter")

val query = output.select("*")
 .writeStream

.outputMode(OutputMode.Append)
 .format("console")
 .option("checkpointLocation", ".spark-checkpoints/")
 .start()

query.awaitTermination()

Thank you. 

 


> Spark 2.3.0 left outer join not emitting null values instead waiting for the 
> record in other stream
> ---------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-23869
>                 URL: https://issues.apache.org/jira/browse/SPARK-23869
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.3.0
>            Reporter: bharath kumar avusherla
>            Priority: Major
>
> Left outer join on two streams not emitting the null outputs. It is just 
> waiting for the record to be added to other stream. Used socketstream to test 
> this. In our case we want to emit the records with null values which doesn't 
> match with id or/and not fall in time range condition
> Details of the watermarks and intervals are:
> val ds1Map = ds1
>  .selectExpr("Id AS ds1_Id", "ds1_timestamp")
>  .withWatermark("ds1_timestamp","10 seconds")
> val ds2Map = ds2
>  .selectExpr("Id AS ds2_Id", "ds2_timestamp")
>  .withWatermark("ds2_timestamp", "20 seconds")
> val output = ds1Map.join( ds2Map,
>  expr(
>  """ ds1_Id = ds2_Id AND ds2_timestamp >= ds1_timestamp AND  ds2_timestamp <= 
> ds1_timestamp + interval 1 minutes """),
>  "leftOuter")
> val query = output.select("*")
>  .writeStream
> .outputMode(OutputMode.Append)
>  .format("console")
>  .option("checkpointLocation", "./spark-checkpoints/")
>  .start()
> query.awaitTermination()
> Thank you. 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to