John P created SPARK-38906: ------------------------------ Summary: Support joinWith() with watermarks Key: SPARK-38906 URL: https://issues.apache.org/jira/browse/SPARK-38906 Project: Spark Issue Type: Improvement Components: SQL, Structured Streaming Affects Versions: 3.2.0 Reporter: John P
Structured streaming requires a watermark for outer joins. This makes it impossible to use joinWith. I've attached a self-contained project example [here|https://gist.github.com/jpassaro/886d5febe6e40bced03a3691115c84d5], but this is the relevant part: {code:scala} streamingDatasetLeft .withWatermark("whenA", "1 second") .joinWith( streamingDatasetRight.withWatermark("whenB", "1 second"), ($"idA" === $"idB") && $"whenB".between($"whenA" - interval, $"whenA" + interval), "leftOuter" ) {code} stack trace: {noformat} [error] org.apache.spark.sql.AnalysisException: Stream-stream LeftOuter join between two streaming DataFrame/Datasets is not supported without a watermark in the join keys, or a watermark on the nullable side and an appropriate range condition; [error] Join LeftOuter, ((_1#40.idA = _2#41.idB) AND ((_2#41.whenB >= cast(whenA#9-T1000ms - INTERVAL '00.2' SECOND as timestamp)) AND (_2#41.whenB <= cast(_1#40.whenA + INTERVAL '00.2' SECOND as timestamp)))) [error] :- Project [named_struct(idA, idA#8, whenA, whenA#9-T1000ms) AS _1#40] [error] : +- EventTimeWatermark whenA#9: timestamp, 1 seconds [error] : +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, example.ExampleA, true])).idA, true, false) AS idA#8, staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, TimestampType, fromJavaTimestamp, knownnotnull(assertnotnull(input[0, example.ExampleA, true])).whenA, true, false) AS whenA#9] [error] : +- MapElements example.Main$$$Lambda$23191/0x00000008061a5840@5af65edf, interface org.apache.spark.sql.Row, [StructField(timestamp,TimestampType,true), StructField(value,LongType,true)], obj#7: example.ExampleA [error] : +- DeserializeToObject createexternalrow(staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, ObjectType(class java.sql.Timestamp), toJavaTimestamp, timestamp#0, true, false), value#1L, StructField(timestamp,TimestampType,true), StructField(value,LongType,true)), obj#6: org.apache.spark.sql.Row [error] : +- GlobalLimit 1 [error] : +- LocalLimit 1 [error] : +- StreamingRelationV2 org.apache.spark.sql.execution.streaming.sources.RateStreamProvider@715e4ad8, rate, org.apache.spark.sql.execution.streaming.sources.RateStreamTable@7963c608, [rowsPerSecond=1], [timestamp#0, value#1L] [error] +- Project [named_struct(idB, idB#22, value, value#23, whenB, whenB#24-T1000ms) AS _2#41] [error] +- EventTimeWatermark whenB#24: timestamp, 1 seconds [error] +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, example.ExampleB, true])).idB, true, false) AS idB#22, knownnotnull(assertnotnull(input[0, example.ExampleB, true])).value AS value#23, staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, TimestampType, fromJavaTimestamp, knownnotnull(assertnotnull(input[0, example.ExampleB, true])).whenB, true, false) AS whenB#24] [error] +- MapElements example.Main$$$Lambda$23254/0x00000008061da840@2a5a573b, interface org.apache.spark.sql.Row, [StructField(timestamp,TimestampType,true), StructField(value,LongType,true)], obj#21: example.ExampleB [error] +- DeserializeToObject createexternalrow(staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, ObjectType(class java.sql.Timestamp), toJavaTimestamp, timestamp#13, true, false), value#14L, StructField(timestamp,TimestampType,true), StructField(value,LongType,true)), obj#20: org.apache.spark.sql.Row [error] +- GlobalLimit 1 [error] +- LocalLimit 1 [error] +- StreamingRelationV2 org.apache.spark.sql.execution.streaming.sources.RateStreamProvider@46bcce3d, rate, org.apache.spark.sql.execution.streaming.sources.RateStreamTable@25a4c0a2, [rowsPerSecond=1], [timestamp#13, value#14L] [error] {noformat} -- This message was sent by Atlassian Jira (v8.20.1#820001) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org