[ https://issues.apache.org/jira/browse/SPARK-26187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16751672#comment-16751672 ]
Jungtaek Lim commented on SPARK-26187: -------------------------------------- Thanks [~ChernikovP], your example helped much to track down the issue. Would you mind if we close this as "duplicated" as SPARK-26154 came earlier and reporter concerns about it? > Stream-stream left outer join returns outer nulls for already matched rows > -------------------------------------------------------------------------- > > Key: SPARK-26187 > URL: https://issues.apache.org/jira/browse/SPARK-26187 > Project: Spark > Issue Type: Bug > Components: Structured Streaming > Affects Versions: 2.3.2 > Reporter: Pavel Chernikov > Priority: Major > > This is basically the same issue as SPARK-26154, but with slightly easier > reproducible and concrete example: > {code:java} > val rateStream = session.readStream > .format("rate") > .option("rowsPerSecond", 1) > .option("numPartitions", 1) > .load() > import org.apache.spark.sql.functions._ > val fooStream = rateStream > .select(col("value").as("fooId"), col("timestamp").as("fooTime")) > val barStream = rateStream > // Introduce misses for ease of debugging > .where(col("value") % 2 === 0) > .select(col("value").as("barId"), col("timestamp").as("barTime")){code} > If barStream is configured to happen earlier than fooStream, based on time > range condition, than everything is all right, no previously matched records > are flushed with outer NULLs: > {code:java} > val query = fooStream > .withWatermark("fooTime", "5 seconds") > .join( > barStream.withWatermark("barTime", "5 seconds"), > expr(""" > barId = fooId AND > fooTime >= barTime AND > fooTime <= barTime + interval 5 seconds > """), > joinType = "leftOuter" > ) > .writeStream > .format("console") > .option("truncate", false) > .start(){code} > It's easy to observe that only odd rows are flushed with NULLs on the right: > {code:java} > [info] Batch: 1 > [info] +-----+-----------------------+-----+-----------------------+ > [info] |fooId|fooTime |barId|barTime | > [info] +-----+-----------------------+-----+-----------------------+ > [info] |0 |2018-11-27 13:12:34.976|0 |2018-11-27 13:12:34.976| > [info] |6 |2018-11-27 13:12:40.976|6 |2018-11-27 13:12:40.976| > [info] |10 |2018-11-27 13:12:44.976|10 |2018-11-27 13:12:44.976| > [info] |8 |2018-11-27 13:12:42.976|8 |2018-11-27 13:12:42.976| > [info] |2 |2018-11-27 13:12:36.976|2 |2018-11-27 13:12:36.976| > [info] |4 |2018-11-27 13:12:38.976|4 |2018-11-27 13:12:38.976| > [info] +-----+-----------------------+-----+-----------------------+ > [info] Batch: 2 > [info] +-----+-----------------------+-----+-----------------------+ > [info] |fooId|fooTime |barId|barTime | > [info] +-----+-----------------------+-----+-----------------------+ > [info] |1 |2018-11-27 13:12:35.976|null |null | > [info] |3 |2018-11-27 13:12:37.976|null |null | > [info] |12 |2018-11-27 13:12:46.976|12 |2018-11-27 13:12:46.976| > [info] |18 |2018-11-27 13:12:52.976|18 |2018-11-27 13:12:52.976| > [info] |14 |2018-11-27 13:12:48.976|14 |2018-11-27 13:12:48.976| > [info] |20 |2018-11-27 13:12:54.976|20 |2018-11-27 13:12:54.976| > [info] |16 |2018-11-27 13:12:50.976|16 |2018-11-27 13:12:50.976| > [info] +-----+-----------------------+-----+-----------------------+ > [info] Batch: 3 > [info] +-----+-----------------------+-----+-----------------------+ > [info] |fooId|fooTime |barId|barTime | > [info] +-----+-----------------------+-----+-----------------------+ > [info] |26 |2018-11-27 13:13:00.976|26 |2018-11-27 13:13:00.976| > [info] |22 |2018-11-27 13:12:56.976|22 |2018-11-27 13:12:56.976| > [info] |7 |2018-11-27 13:12:41.976|null |null | > [info] |9 |2018-11-27 13:12:43.976|null |null | > [info] |28 |2018-11-27 13:13:02.976|28 |2018-11-27 13:13:02.976| > [info] |5 |2018-11-27 13:12:39.976|null |null | > [info] |11 |2018-11-27 13:12:45.976|null |null | > [info] |13 |2018-11-27 13:12:47.976|null |null | > [info] |24 |2018-11-27 13:12:58.976|24 |2018-11-27 13:12:58.976| > [info] +-----+-----------------------+-----+-----------------------+ > {code} > On the other hand, if we switch the ordering and now fooStream is happening > earlier based on time range condition: > {code:java} > val query = fooStream > .withWatermark("fooTime", "5 seconds") > .join( > barStream.withWatermark("barTime", "5 seconds"), > expr(""" > barId = fooId AND > barTime >= fooTime AND > barTime <= fooTime + interval 5 seconds > """), > joinType = "leftOuter" > ) > .writeStream > .format("console") > .option("truncate", false) > .start(){code} > Some, not all, previously matched records (with even IDs) are omitted with > outer NULLs along with all unmatched records (with odd IDs): > {code:java} > [info] Batch: 1 > [info] +-----+-----------------------+-----+-----------------------+ > [info] |fooId|fooTime |barId|barTime | > [info] +-----+-----------------------+-----+-----------------------+ > [info] |0 |2018-11-27 13:26:11.463|0 |2018-11-27 13:26:11.463| > [info] |6 |2018-11-27 13:26:17.463|6 |2018-11-27 13:26:17.463| > [info] |10 |2018-11-27 13:26:21.463|10 |2018-11-27 13:26:21.463| > [info] |8 |2018-11-27 13:26:19.463|8 |2018-11-27 13:26:19.463| > [info] |2 |2018-11-27 13:26:13.463|2 |2018-11-27 13:26:13.463| > [info] |4 |2018-11-27 13:26:15.463|4 |2018-11-27 13:26:15.463| > [info] +-----+-----------------------+-----+-----------------------+ > [info] Batch: 2 > [info] +-----+-----------------------+-----+-----------------------+ > [info] |fooId|fooTime |barId|barTime | > [info] +-----+-----------------------+-----+-----------------------+ > [info] |12 |2018-11-27 13:26:23.463|12 |2018-11-27 13:26:23.463| > [info] |18 |2018-11-27 13:26:29.463|18 |2018-11-27 13:26:29.463| > [info] |14 |2018-11-27 13:26:25.463|14 |2018-11-27 13:26:25.463| > [info] |20 |2018-11-27 13:26:31.463|20 |2018-11-27 13:26:31.463| > [info] |16 |2018-11-27 13:26:27.463|16 |2018-11-27 13:26:27.463| > [info] +-----+-----------------------+-----+-----------------------+ > [info] Batch: 3 > [info] +-----+-----------------------+-----+-----------------------+ > [info] |fooId|fooTime |barId|barTime | > [info] +-----+-----------------------+-----+-----------------------+ > [info] |26 |2018-11-27 13:26:37.463|26 |2018-11-27 13:26:37.463| > [info] |0 |2018-11-27 13:26:11.463|null |null | > [info] |22 |2018-11-27 13:26:33.463|22 |2018-11-27 13:26:33.463| > [info] |7 |2018-11-27 13:26:18.463|null |null | > [info] |9 |2018-11-27 13:26:20.463|null |null | > [info] |28 |2018-11-27 13:26:39.463|28 |2018-11-27 13:26:39.463| > [info] |5 |2018-11-27 13:26:16.463|null |null | > [info] |1 |2018-11-27 13:26:12.463|null |null | > [info] |3 |2018-11-27 13:26:14.463|null |null | > [info] |2 |2018-11-27 13:26:13.463|null |null | > [info] |4 |2018-11-27 13:26:15.463|null |null | > [info] |30 |2018-11-27 13:26:41.463|30 |2018-11-27 13:26:41.463| > [info] |24 |2018-11-27 13:26:35.463|24 |2018-11-27 13:26:35.463| > [info] +-----+-----------------------+-----+-----------------------+ > [info] Batch: 4 > [info] +-----+-----------------------+-----+-----------------------+ > [info] |fooId|fooTime |barId|barTime | > [info] +-----+-----------------------+-----+-----------------------+ > [info] |19 |2018-11-27 13:26:30.463|null |null | > [info] |34 |2018-11-27 13:26:45.463|34 |2018-11-27 13:26:45.463| > [info] |32 |2018-11-27 13:26:43.463|32 |2018-11-27 13:26:43.463| > [info] |17 |2018-11-27 13:26:28.463|null |null | > [info] |10 |2018-11-27 13:26:21.463|null |null | > [info] |12 |2018-11-27 13:26:23.463|null |null | > [info] |11 |2018-11-27 13:26:22.463|null |null | > [info] |13 |2018-11-27 13:26:24.463|null |null | > [info] |36 |2018-11-27 13:26:47.463|36 |2018-11-27 13:26:47.463| > [info] |14 |2018-11-27 13:26:25.463|null |null | > [info] |15 |2018-11-27 13:26:26.463|null |null | > [info] |38 |2018-11-27 13:26:49.463|38 |2018-11-27 13:26:49.463| > [info] |40 |2018-11-27 13:26:51.463|40 |2018-11-27 13:26:51.463| > [info] +-----+-----------------------+-----+-----------------------+ > {code} > h6. > h6. -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org