[ https://issues.apache.org/jira/browse/SPARK-26187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16748604#comment-16748604 ]
Jungtaek Lim commented on SPARK-26187: -------------------------------------- I guess I can reproduce this issue against master branch. {code} val rateStream = spark.readStream .format("rate") .option("rowsPerSecond", 1) .option("numPartitions", 1) .load() import org.apache.spark.sql.functions._ val fooStream = rateStream .select(col("value").as("fooId"), col("timestamp").as("fooTime")) val barStream = rateStream // Introduce misses for ease of debugging .where(col("value") % 2 === 0) .select(col("value").as("barId"), col("timestamp").as("barTime")) val query = fooStream .withWatermark("fooTime", "5 seconds") .join( barStream.withWatermark("barTime", "5 seconds"), expr(""" barId = fooId AND barTime >= fooTime AND barTime <= fooTime + interval 5 seconds """), joinType = "leftOuter" ) .writeStream .format("console") .option("truncate", false) .option("numRows", 10000) .start() {code} {noformat} ------------------------------------------- Batch: 1 ------------------------------------------- +-----+-----------------------+-----+-----------------------+ |fooId|fooTime |barId|barTime | +-----+-----------------------+-----+-----------------------+ |26 |2019-01-22 14:44:03.885|26 |2019-01-22 14:44:03.885| |54 |2019-01-22 14:44:31.885|54 |2019-01-22 14:44:31.885| |0 |2019-01-22 14:43:37.885|0 |2019-01-22 14:43:37.885| |22 |2019-01-22 14:43:59.885|22 |2019-01-22 14:43:59.885| |34 |2019-01-22 14:44:11.885|34 |2019-01-22 14:44:11.885| |50 |2019-01-22 14:44:27.885|50 |2019-01-22 14:44:27.885| |32 |2019-01-22 14:44:09.885|32 |2019-01-22 14:44:09.885| |6 |2019-01-22 14:43:43.885|6 |2019-01-22 14:43:43.885| |68 |2019-01-22 14:44:45.885|68 |2019-01-22 14:44:45.885| |72 |2019-01-22 14:44:49.885|72 |2019-01-22 14:44:49.885| |58 |2019-01-22 14:44:35.885|58 |2019-01-22 14:44:35.885| |56 |2019-01-22 14:44:33.885|56 |2019-01-22 14:44:33.885| |52 |2019-01-22 14:44:29.885|52 |2019-01-22 14:44:29.885| |28 |2019-01-22 14:44:05.885|28 |2019-01-22 14:44:05.885| |10 |2019-01-22 14:43:47.885|10 |2019-01-22 14:43:47.885| |48 |2019-01-22 14:44:25.885|48 |2019-01-22 14:44:25.885| |44 |2019-01-22 14:44:21.885|44 |2019-01-22 14:44:21.885| |12 |2019-01-22 14:43:49.885|12 |2019-01-22 14:43:49.885| |74 |2019-01-22 14:44:51.885|74 |2019-01-22 14:44:51.885| |8 |2019-01-22 14:43:45.885|8 |2019-01-22 14:43:45.885| |62 |2019-01-22 14:44:39.885|62 |2019-01-22 14:44:39.885| |2 |2019-01-22 14:43:39.885|2 |2019-01-22 14:43:39.885| |66 |2019-01-22 14:44:43.885|66 |2019-01-22 14:44:43.885| |4 |2019-01-22 14:43:41.885|4 |2019-01-22 14:43:41.885| |36 |2019-01-22 14:44:13.885|36 |2019-01-22 14:44:13.885| |18 |2019-01-22 14:43:55.885|18 |2019-01-22 14:43:55.885| |14 |2019-01-22 14:43:51.885|14 |2019-01-22 14:43:51.885| |38 |2019-01-22 14:44:15.885|38 |2019-01-22 14:44:15.885| |30 |2019-01-22 14:44:07.885|30 |2019-01-22 14:44:07.885| |42 |2019-01-22 14:44:19.885|42 |2019-01-22 14:44:19.885| |46 |2019-01-22 14:44:23.885|46 |2019-01-22 14:44:23.885| |20 |2019-01-22 14:43:57.885|20 |2019-01-22 14:43:57.885| |70 |2019-01-22 14:44:47.885|70 |2019-01-22 14:44:47.885| |60 |2019-01-22 14:44:37.885|60 |2019-01-22 14:44:37.885| |40 |2019-01-22 14:44:17.885|40 |2019-01-22 14:44:17.885| |16 |2019-01-22 14:43:53.885|16 |2019-01-22 14:43:53.885| |64 |2019-01-22 14:44:41.885|64 |2019-01-22 14:44:41.885| |24 |2019-01-22 14:44:01.885|24 |2019-01-22 14:44:01.885| +-----+-----------------------+-----+-----------------------+ ------------------------------------------- Batch: 2 ------------------------------------------- +-----+-----------------------+-----+-----------------------+ |fooId|fooTime |barId|barTime | +-----+-----------------------+-----+-----------------------+ |29 |2019-01-22 14:44:06.885|null |null | |19 |2019-01-22 14:43:56.885|null |null | |112 |2019-01-22 14:45:29.885|112 |2019-01-22 14:45:29.885| |7 |2019-01-22 14:43:44.885|null |null | |94 |2019-01-22 14:45:11.885|94 |2019-01-22 14:45:11.885| |110 |2019-01-22 14:45:27.885|110 |2019-01-22 14:45:27.885| |57 |2019-01-22 14:44:34.885|null |null | |43 |2019-01-22 14:44:20.885|null |null | |84 |2019-01-22 14:45:01.885|84 |2019-01-22 14:45:01.885| |31 |2019-01-22 14:44:08.885|null |null | |98 |2019-01-22 14:45:15.885|98 |2019-01-22 14:45:15.885| |39 |2019-01-22 14:44:16.885|null |null | |25 |2019-01-22 14:44:02.885|null |null | |9 |2019-01-22 14:43:46.885|null |null | |63 |2019-01-22 14:44:40.885|null |null | |27 |2019-01-22 14:44:04.885|null |null | |51 |2019-01-22 14:44:28.885|null |null | |17 |2019-01-22 14:43:54.885|null |null | |41 |2019-01-22 14:44:18.885|null |null | |88 |2019-01-22 14:45:05.885|88 |2019-01-22 14:45:05.885| |33 |2019-01-22 14:44:10.885|null |null | |114 |2019-01-22 14:45:31.885|114 |2019-01-22 14:45:31.885| |5 |2019-01-22 14:43:42.885|null |null | |1 |2019-01-22 14:43:38.885|null |null | |96 |2019-01-22 14:45:13.885|96 |2019-01-22 14:45:13.885| |104 |2019-01-22 14:45:21.885|104 |2019-01-22 14:45:21.885| |106 |2019-01-22 14:45:23.885|106 |2019-01-22 14:45:23.885| |100 |2019-01-22 14:45:17.885|100 |2019-01-22 14:45:17.885| |108 |2019-01-22 14:45:25.885|108 |2019-01-22 14:45:25.885| |61 |2019-01-22 14:44:38.885|null |null | |3 |2019-01-22 14:43:40.885|null |null | |37 |2019-01-22 14:44:14.885|null |null | |55 |2019-01-22 14:44:32.885|null |null | |49 |2019-01-22 14:44:26.885|null |null | |11 |2019-01-22 14:43:48.885|null |null | |35 |2019-01-22 14:44:12.885|null |null | |80 |2019-01-22 14:44:57.885|80 |2019-01-22 14:44:57.885| |76 |2019-01-22 14:44:53.885|76 |2019-01-22 14:44:53.885| |102 |2019-01-22 14:45:19.885|102 |2019-01-22 14:45:19.885| |92 |2019-01-22 14:45:09.885|92 |2019-01-22 14:45:09.885| |13 |2019-01-22 14:43:50.885|null |null | |78 |2019-01-22 14:44:55.885|78 |2019-01-22 14:44:55.885| |21 |2019-01-22 14:43:58.885|null |null | |59 |2019-01-22 14:44:36.885|null |null | |15 |2019-01-22 14:43:52.885|null |null | |82 |2019-01-22 14:44:59.885|82 |2019-01-22 14:44:59.885| |90 |2019-01-22 14:45:07.885|90 |2019-01-22 14:45:07.885| |23 |2019-01-22 14:44:00.885|null |null | |86 |2019-01-22 14:45:03.885|86 |2019-01-22 14:45:03.885| |53 |2019-01-22 14:44:30.885|null |null | |47 |2019-01-22 14:44:24.885|null |null | |45 |2019-01-22 14:44:22.885|null |null | +-----+-----------------------+-----+-----------------------+ ------------------------------------------- Batch: 3 ------------------------------------------- +-----+-----------------------+-----+-----------------------+ |fooId|fooTime |barId|barTime | +-----+-----------------------+-----+-----------------------+ |65 |2019-01-22 14:44:42.885|null |null | |130 |2019-01-22 14:45:47.885|130 |2019-01-22 14:45:47.885| |77 |2019-01-22 14:44:54.885|null |null | |126 |2019-01-22 14:45:43.885|126 |2019-01-22 14:45:43.885| |136 |2019-01-22 14:45:53.885|136 |2019-01-22 14:45:53.885| |144 |2019-01-22 14:46:01.885|144 |2019-01-22 14:46:01.885| |116 |2019-01-22 14:45:33.885|116 |2019-01-22 14:45:33.885| |124 |2019-01-22 14:45:41.885|124 |2019-01-22 14:45:41.885| |95 |2019-01-22 14:45:12.885|null |null | |71 |2019-01-22 14:44:48.885|null |null | |68 |2019-01-22 14:44:45.885|null |null | |87 |2019-01-22 14:45:04.885|null |null | |150 |2019-01-22 14:46:07.885|150 |2019-01-22 14:46:07.885| |146 |2019-01-22 14:46:03.885|146 |2019-01-22 14:46:03.885| |79 |2019-01-22 14:44:56.885|null |null | |103 |2019-01-22 14:45:20.885|null |null | |138 |2019-01-22 14:45:55.885|138 |2019-01-22 14:45:55.885| |120 |2019-01-22 14:45:37.885|120 |2019-01-22 14:45:37.885| |89 |2019-01-22 14:45:06.885|null |null | |128 |2019-01-22 14:45:45.885|128 |2019-01-22 14:45:45.885| |85 |2019-01-22 14:45:02.885|null |null | |67 |2019-01-22 14:44:44.885|null |null | |142 |2019-01-22 14:45:59.885|142 |2019-01-22 14:45:59.885| |132 |2019-01-22 14:45:49.885|132 |2019-01-22 14:45:49.885| |101 |2019-01-22 14:45:18.885|null |null | |83 |2019-01-22 14:45:00.885|null |null | |154 |2019-01-22 14:46:11.885|154 |2019-01-22 14:46:11.885| |66 |2019-01-22 14:44:43.885|null |null | |134 |2019-01-22 14:45:51.885|134 |2019-01-22 14:45:51.885| |118 |2019-01-22 14:45:35.885|118 |2019-01-22 14:45:35.885| |75 |2019-01-22 14:44:52.885|null |null | |140 |2019-01-22 14:45:57.885|140 |2019-01-22 14:45:57.885| |69 |2019-01-22 14:44:46.885|null |null | |122 |2019-01-22 14:45:39.885|122 |2019-01-22 14:45:39.885| |81 |2019-01-22 14:44:58.885|null |null | |97 |2019-01-22 14:45:14.885|null |null | |73 |2019-01-22 14:44:50.885|null |null | |99 |2019-01-22 14:45:16.885|null |null | |93 |2019-01-22 14:45:10.885|null |null | |64 |2019-01-22 14:44:41.885|null |null | |152 |2019-01-22 14:46:09.885|152 |2019-01-22 14:46:09.885| |91 |2019-01-22 14:45:08.885|null |null | |148 |2019-01-22 14:46:05.885|148 |2019-01-22 14:46:05.885| +-----+-----------------------+-----+-----------------------+ {noformat} fooId 64, 68 is matched against barId 64, 68 in batch 1, but also emitted with null matching in batch 3. > Stream-stream left outer join returns outer nulls for already matched rows > -------------------------------------------------------------------------- > > Key: SPARK-26187 > URL: https://issues.apache.org/jira/browse/SPARK-26187 > Project: Spark > Issue Type: Bug > Components: Structured Streaming > Affects Versions: 2.3.2 > Reporter: Pavel Chernikov > Priority: Major > > This is basically the same issue as SPARK-26154, but with slightly easier > reproducible and concrete example: > {code:java} > val rateStream = session.readStream > .format("rate") > .option("rowsPerSecond", 1) > .option("numPartitions", 1) > .load() > import org.apache.spark.sql.functions._ > val fooStream = rateStream > .select(col("value").as("fooId"), col("timestamp").as("fooTime")) > val barStream = rateStream > // Introduce misses for ease of debugging > .where(col("value") % 2 === 0) > .select(col("value").as("barId"), col("timestamp").as("barTime")){code} > If barStream is configured to happen earlier than fooStream, based on time > range condition, than everything is all right, no previously matched records > are flushed with outer NULLs: > {code:java} > val query = fooStream > .withWatermark("fooTime", "5 seconds") > .join( > barStream.withWatermark("barTime", "5 seconds"), > expr(""" > barId = fooId AND > fooTime >= barTime AND > fooTime <= barTime + interval 5 seconds > """), > joinType = "leftOuter" > ) > .writeStream > .format("console") > .option("truncate", false) > .start(){code} > It's easy to observe that only odd rows are flushed with NULLs on the right: > {code:java} > [info] Batch: 1 > [info] +-----+-----------------------+-----+-----------------------+ > [info] |fooId|fooTime |barId|barTime | > [info] +-----+-----------------------+-----+-----------------------+ > [info] |0 |2018-11-27 13:12:34.976|0 |2018-11-27 13:12:34.976| > [info] |6 |2018-11-27 13:12:40.976|6 |2018-11-27 13:12:40.976| > [info] |10 |2018-11-27 13:12:44.976|10 |2018-11-27 13:12:44.976| > [info] |8 |2018-11-27 13:12:42.976|8 |2018-11-27 13:12:42.976| > [info] |2 |2018-11-27 13:12:36.976|2 |2018-11-27 13:12:36.976| > [info] |4 |2018-11-27 13:12:38.976|4 |2018-11-27 13:12:38.976| > [info] +-----+-----------------------+-----+-----------------------+ > [info] Batch: 2 > [info] +-----+-----------------------+-----+-----------------------+ > [info] |fooId|fooTime |barId|barTime | > [info] +-----+-----------------------+-----+-----------------------+ > [info] |1 |2018-11-27 13:12:35.976|null |null | > [info] |3 |2018-11-27 13:12:37.976|null |null | > [info] |12 |2018-11-27 13:12:46.976|12 |2018-11-27 13:12:46.976| > [info] |18 |2018-11-27 13:12:52.976|18 |2018-11-27 13:12:52.976| > [info] |14 |2018-11-27 13:12:48.976|14 |2018-11-27 13:12:48.976| > [info] |20 |2018-11-27 13:12:54.976|20 |2018-11-27 13:12:54.976| > [info] |16 |2018-11-27 13:12:50.976|16 |2018-11-27 13:12:50.976| > [info] +-----+-----------------------+-----+-----------------------+ > [info] Batch: 3 > [info] +-----+-----------------------+-----+-----------------------+ > [info] |fooId|fooTime |barId|barTime | > [info] +-----+-----------------------+-----+-----------------------+ > [info] |26 |2018-11-27 13:13:00.976|26 |2018-11-27 13:13:00.976| > [info] |22 |2018-11-27 13:12:56.976|22 |2018-11-27 13:12:56.976| > [info] |7 |2018-11-27 13:12:41.976|null |null | > [info] |9 |2018-11-27 13:12:43.976|null |null | > [info] |28 |2018-11-27 13:13:02.976|28 |2018-11-27 13:13:02.976| > [info] |5 |2018-11-27 13:12:39.976|null |null | > [info] |11 |2018-11-27 13:12:45.976|null |null | > [info] |13 |2018-11-27 13:12:47.976|null |null | > [info] |24 |2018-11-27 13:12:58.976|24 |2018-11-27 13:12:58.976| > [info] +-----+-----------------------+-----+-----------------------+ > {code} > On the other hand, if we switch the ordering and now fooStream is happening > earlier based on time range condition: > {code:java} > val query = fooStream > .withWatermark("fooTime", "5 seconds") > .join( > barStream.withWatermark("barTime", "5 seconds"), > expr(""" > barId = fooId AND > barTime >= fooTime AND > barTime <= fooTime + interval 5 seconds > """), > joinType = "leftOuter" > ) > .writeStream > .format("console") > .option("truncate", false) > .start(){code} > Some, not all, previously matched records (with even IDs) are omitted with > outer NULLs along with all unmatched records (with odd IDs): > {code:java} > [info] Batch: 1 > [info] +-----+-----------------------+-----+-----------------------+ > [info] |fooId|fooTime |barId|barTime | > [info] +-----+-----------------------+-----+-----------------------+ > [info] |0 |2018-11-27 13:26:11.463|0 |2018-11-27 13:26:11.463| > [info] |6 |2018-11-27 13:26:17.463|6 |2018-11-27 13:26:17.463| > [info] |10 |2018-11-27 13:26:21.463|10 |2018-11-27 13:26:21.463| > [info] |8 |2018-11-27 13:26:19.463|8 |2018-11-27 13:26:19.463| > [info] |2 |2018-11-27 13:26:13.463|2 |2018-11-27 13:26:13.463| > [info] |4 |2018-11-27 13:26:15.463|4 |2018-11-27 13:26:15.463| > [info] +-----+-----------------------+-----+-----------------------+ > [info] Batch: 2 > [info] +-----+-----------------------+-----+-----------------------+ > [info] |fooId|fooTime |barId|barTime | > [info] +-----+-----------------------+-----+-----------------------+ > [info] |12 |2018-11-27 13:26:23.463|12 |2018-11-27 13:26:23.463| > [info] |18 |2018-11-27 13:26:29.463|18 |2018-11-27 13:26:29.463| > [info] |14 |2018-11-27 13:26:25.463|14 |2018-11-27 13:26:25.463| > [info] |20 |2018-11-27 13:26:31.463|20 |2018-11-27 13:26:31.463| > [info] |16 |2018-11-27 13:26:27.463|16 |2018-11-27 13:26:27.463| > [info] +-----+-----------------------+-----+-----------------------+ > [info] Batch: 3 > [info] +-----+-----------------------+-----+-----------------------+ > [info] |fooId|fooTime |barId|barTime | > [info] +-----+-----------------------+-----+-----------------------+ > [info] |26 |2018-11-27 13:26:37.463|26 |2018-11-27 13:26:37.463| > [info] |0 |2018-11-27 13:26:11.463|null |null | > [info] |22 |2018-11-27 13:26:33.463|22 |2018-11-27 13:26:33.463| > [info] |7 |2018-11-27 13:26:18.463|null |null | > [info] |9 |2018-11-27 13:26:20.463|null |null | > [info] |28 |2018-11-27 13:26:39.463|28 |2018-11-27 13:26:39.463| > [info] |5 |2018-11-27 13:26:16.463|null |null | > [info] |1 |2018-11-27 13:26:12.463|null |null | > [info] |3 |2018-11-27 13:26:14.463|null |null | > [info] |2 |2018-11-27 13:26:13.463|null |null | > [info] |4 |2018-11-27 13:26:15.463|null |null | > [info] |30 |2018-11-27 13:26:41.463|30 |2018-11-27 13:26:41.463| > [info] |24 |2018-11-27 13:26:35.463|24 |2018-11-27 13:26:35.463| > [info] +-----+-----------------------+-----+-----------------------+ > [info] Batch: 4 > [info] +-----+-----------------------+-----+-----------------------+ > [info] |fooId|fooTime |barId|barTime | > [info] +-----+-----------------------+-----+-----------------------+ > [info] |19 |2018-11-27 13:26:30.463|null |null | > [info] |34 |2018-11-27 13:26:45.463|34 |2018-11-27 13:26:45.463| > [info] |32 |2018-11-27 13:26:43.463|32 |2018-11-27 13:26:43.463| > [info] |17 |2018-11-27 13:26:28.463|null |null | > [info] |10 |2018-11-27 13:26:21.463|null |null | > [info] |12 |2018-11-27 13:26:23.463|null |null | > [info] |11 |2018-11-27 13:26:22.463|null |null | > [info] |13 |2018-11-27 13:26:24.463|null |null | > [info] |36 |2018-11-27 13:26:47.463|36 |2018-11-27 13:26:47.463| > [info] |14 |2018-11-27 13:26:25.463|null |null | > [info] |15 |2018-11-27 13:26:26.463|null |null | > [info] |38 |2018-11-27 13:26:49.463|38 |2018-11-27 13:26:49.463| > [info] |40 |2018-11-27 13:26:51.463|40 |2018-11-27 13:26:51.463| > [info] +-----+-----------------------+-----+-----------------------+ > {code} > h6. > h6. -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org