John P created SPARK-38906:
------------------------------

             Summary: Support joinWith() with watermarks
                 Key: SPARK-38906
                 URL: https://issues.apache.org/jira/browse/SPARK-38906
             Project: Spark
          Issue Type: Improvement
          Components: SQL, Structured Streaming
    Affects Versions: 3.2.0
            Reporter: John P


Structured streaming requires a watermark for outer joins. This makes it 
impossible to use joinWith.

I've attached a self-contained project example 
[here|https://gist.github.com/jpassaro/886d5febe6e40bced03a3691115c84d5], but 
this is the relevant part:

{code:scala}
streamingDatasetLeft
      .withWatermark("whenA", "1 second")
      .joinWith(
        streamingDatasetRight.withWatermark("whenB", "1 second"),
        ($"idA" === $"idB") && $"whenB".between($"whenA" - interval, $"whenA" + 
interval),
        "leftOuter"
      )
{code}


stack trace:

{noformat}
[error] org.apache.spark.sql.AnalysisException: Stream-stream LeftOuter join 
between two streaming DataFrame/Datasets is not supported without a watermark 
in the join keys, or a watermark on the nullable side and an appropriate range 
condition;
[error] Join LeftOuter, ((_1#40.idA = _2#41.idB) AND ((_2#41.whenB >= 
cast(whenA#9-T1000ms - INTERVAL '00.2' SECOND as timestamp)) AND (_2#41.whenB 
<= cast(_1#40.whenA + INTERVAL '00.2' SECOND as timestamp))))
[error] :- Project [named_struct(idA, idA#8, whenA, whenA#9-T1000ms) AS _1#40]
[error] :  +- EventTimeWatermark whenA#9: timestamp, 1 seconds
[error] :     +- SerializeFromObject [staticinvoke(class 
org.apache.spark.unsafe.types.UTF8String, StringType, fromString, 
knownnotnull(assertnotnull(input[0, example.ExampleA, true])).idA, true, false) 
AS idA#8, staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, 
TimestampType, fromJavaTimestamp, knownnotnull(assertnotnull(input[0, 
example.ExampleA, true])).whenA, true, false) AS whenA#9]
[error] :        +- MapElements 
example.Main$$$Lambda$23191/0x00000008061a5840@5af65edf, interface 
org.apache.spark.sql.Row, [StructField(timestamp,TimestampType,true), 
StructField(value,LongType,true)], obj#7: example.ExampleA
[error] :           +- DeserializeToObject createexternalrow(staticinvoke(class 
org.apache.spark.sql.catalyst.util.DateTimeUtils$, ObjectType(class 
java.sql.Timestamp), toJavaTimestamp, timestamp#0, true, false), value#1L, 
StructField(timestamp,TimestampType,true), StructField(value,LongType,true)), 
obj#6: org.apache.spark.sql.Row
[error] :              +- GlobalLimit 1
[error] :                 +- LocalLimit 1
[error] :                    +- StreamingRelationV2 
org.apache.spark.sql.execution.streaming.sources.RateStreamProvider@715e4ad8, 
rate, 
org.apache.spark.sql.execution.streaming.sources.RateStreamTable@7963c608, 
[rowsPerSecond=1], [timestamp#0, value#1L]
[error] +- Project [named_struct(idB, idB#22, value, value#23, whenB, 
whenB#24-T1000ms) AS _2#41]
[error]    +- EventTimeWatermark whenB#24: timestamp, 1 seconds
[error]       +- SerializeFromObject [staticinvoke(class 
org.apache.spark.unsafe.types.UTF8String, StringType, fromString, 
knownnotnull(assertnotnull(input[0, example.ExampleB, true])).idB, true, false) 
AS idB#22, knownnotnull(assertnotnull(input[0, example.ExampleB, true])).value 
AS value#23, staticinvoke(class 
org.apache.spark.sql.catalyst.util.DateTimeUtils$, TimestampType, 
fromJavaTimestamp, knownnotnull(assertnotnull(input[0, example.ExampleB, 
true])).whenB, true, false) AS whenB#24]
[error]          +- MapElements 
example.Main$$$Lambda$23254/0x00000008061da840@2a5a573b, interface 
org.apache.spark.sql.Row, [StructField(timestamp,TimestampType,true), 
StructField(value,LongType,true)], obj#21: example.ExampleB
[error]             +- DeserializeToObject createexternalrow(staticinvoke(class 
org.apache.spark.sql.catalyst.util.DateTimeUtils$, ObjectType(class 
java.sql.Timestamp), toJavaTimestamp, timestamp#13, true, false), value#14L, 
StructField(timestamp,TimestampType,true), StructField(value,LongType,true)), 
obj#20: org.apache.spark.sql.Row
[error]                +- GlobalLimit 1
[error]                   +- LocalLimit 1
[error]                      +- StreamingRelationV2 
org.apache.spark.sql.execution.streaming.sources.RateStreamProvider@46bcce3d, 
rate, 
org.apache.spark.sql.execution.streaming.sources.RateStreamTable@25a4c0a2, 
[rowsPerSecond=1], [timestamp#13, value#14L]
[error]
{noformat}




--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to