[ 
https://issues.apache.org/jira/browse/SPARK-26187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16748604#comment-16748604
 ] 

Jungtaek Lim commented on SPARK-26187:
--------------------------------------

I guess I can reproduce this issue against master branch.

{code}
val rateStream = spark.readStream
 .format("rate")
 .option("rowsPerSecond", 1)
 .option("numPartitions", 1)
 .load()

import org.apache.spark.sql.functions._

val fooStream = rateStream
 .select(col("value").as("fooId"), col("timestamp").as("fooTime"))

val barStream = rateStream
 // Introduce misses for ease of debugging
 .where(col("value") % 2 === 0)
 .select(col("value").as("barId"), col("timestamp").as("barTime"))

val query = fooStream
 .withWatermark("fooTime", "5 seconds")
 .join(
   barStream.withWatermark("barTime", "5 seconds"),
   expr("""
     barId = fooId AND
     barTime >= fooTime AND
     barTime <= fooTime + interval 5 seconds
        """),
   joinType = "leftOuter"
 )
 .writeStream
 .format("console")
 .option("truncate", false)
 .option("numRows", 10000)
 .start()
{code}

{noformat}
-------------------------------------------
Batch: 1
-------------------------------------------
+-----+-----------------------+-----+-----------------------+
|fooId|fooTime                |barId|barTime                |
+-----+-----------------------+-----+-----------------------+
|26   |2019-01-22 14:44:03.885|26   |2019-01-22 14:44:03.885|
|54   |2019-01-22 14:44:31.885|54   |2019-01-22 14:44:31.885|
|0    |2019-01-22 14:43:37.885|0    |2019-01-22 14:43:37.885|
|22   |2019-01-22 14:43:59.885|22   |2019-01-22 14:43:59.885|
|34   |2019-01-22 14:44:11.885|34   |2019-01-22 14:44:11.885|
|50   |2019-01-22 14:44:27.885|50   |2019-01-22 14:44:27.885|
|32   |2019-01-22 14:44:09.885|32   |2019-01-22 14:44:09.885|
|6    |2019-01-22 14:43:43.885|6    |2019-01-22 14:43:43.885|
|68   |2019-01-22 14:44:45.885|68   |2019-01-22 14:44:45.885|
|72   |2019-01-22 14:44:49.885|72   |2019-01-22 14:44:49.885|
|58   |2019-01-22 14:44:35.885|58   |2019-01-22 14:44:35.885|
|56   |2019-01-22 14:44:33.885|56   |2019-01-22 14:44:33.885|
|52   |2019-01-22 14:44:29.885|52   |2019-01-22 14:44:29.885|
|28   |2019-01-22 14:44:05.885|28   |2019-01-22 14:44:05.885|
|10   |2019-01-22 14:43:47.885|10   |2019-01-22 14:43:47.885|
|48   |2019-01-22 14:44:25.885|48   |2019-01-22 14:44:25.885|
|44   |2019-01-22 14:44:21.885|44   |2019-01-22 14:44:21.885|
|12   |2019-01-22 14:43:49.885|12   |2019-01-22 14:43:49.885|
|74   |2019-01-22 14:44:51.885|74   |2019-01-22 14:44:51.885|
|8    |2019-01-22 14:43:45.885|8    |2019-01-22 14:43:45.885|
|62   |2019-01-22 14:44:39.885|62   |2019-01-22 14:44:39.885|
|2    |2019-01-22 14:43:39.885|2    |2019-01-22 14:43:39.885|
|66   |2019-01-22 14:44:43.885|66   |2019-01-22 14:44:43.885|
|4    |2019-01-22 14:43:41.885|4    |2019-01-22 14:43:41.885|
|36   |2019-01-22 14:44:13.885|36   |2019-01-22 14:44:13.885|
|18   |2019-01-22 14:43:55.885|18   |2019-01-22 14:43:55.885|
|14   |2019-01-22 14:43:51.885|14   |2019-01-22 14:43:51.885|
|38   |2019-01-22 14:44:15.885|38   |2019-01-22 14:44:15.885|
|30   |2019-01-22 14:44:07.885|30   |2019-01-22 14:44:07.885|
|42   |2019-01-22 14:44:19.885|42   |2019-01-22 14:44:19.885|
|46   |2019-01-22 14:44:23.885|46   |2019-01-22 14:44:23.885|
|20   |2019-01-22 14:43:57.885|20   |2019-01-22 14:43:57.885|
|70   |2019-01-22 14:44:47.885|70   |2019-01-22 14:44:47.885|
|60   |2019-01-22 14:44:37.885|60   |2019-01-22 14:44:37.885|
|40   |2019-01-22 14:44:17.885|40   |2019-01-22 14:44:17.885|
|16   |2019-01-22 14:43:53.885|16   |2019-01-22 14:43:53.885|
|64   |2019-01-22 14:44:41.885|64   |2019-01-22 14:44:41.885|
|24   |2019-01-22 14:44:01.885|24   |2019-01-22 14:44:01.885|
+-----+-----------------------+-----+-----------------------+

-------------------------------------------
Batch: 2
-------------------------------------------
+-----+-----------------------+-----+-----------------------+
|fooId|fooTime                |barId|barTime                |
+-----+-----------------------+-----+-----------------------+
|29   |2019-01-22 14:44:06.885|null |null                   |
|19   |2019-01-22 14:43:56.885|null |null                   |
|112  |2019-01-22 14:45:29.885|112  |2019-01-22 14:45:29.885|
|7    |2019-01-22 14:43:44.885|null |null                   |
|94   |2019-01-22 14:45:11.885|94   |2019-01-22 14:45:11.885|
|110  |2019-01-22 14:45:27.885|110  |2019-01-22 14:45:27.885|
|57   |2019-01-22 14:44:34.885|null |null                   |
|43   |2019-01-22 14:44:20.885|null |null                   |
|84   |2019-01-22 14:45:01.885|84   |2019-01-22 14:45:01.885|
|31   |2019-01-22 14:44:08.885|null |null                   |
|98   |2019-01-22 14:45:15.885|98   |2019-01-22 14:45:15.885|
|39   |2019-01-22 14:44:16.885|null |null                   |
|25   |2019-01-22 14:44:02.885|null |null                   |
|9    |2019-01-22 14:43:46.885|null |null                   |
|63   |2019-01-22 14:44:40.885|null |null                   |
|27   |2019-01-22 14:44:04.885|null |null                   |
|51   |2019-01-22 14:44:28.885|null |null                   |
|17   |2019-01-22 14:43:54.885|null |null                   |
|41   |2019-01-22 14:44:18.885|null |null                   |
|88   |2019-01-22 14:45:05.885|88   |2019-01-22 14:45:05.885|
|33   |2019-01-22 14:44:10.885|null |null                   |
|114  |2019-01-22 14:45:31.885|114  |2019-01-22 14:45:31.885|
|5    |2019-01-22 14:43:42.885|null |null                   |
|1    |2019-01-22 14:43:38.885|null |null                   |
|96   |2019-01-22 14:45:13.885|96   |2019-01-22 14:45:13.885|
|104  |2019-01-22 14:45:21.885|104  |2019-01-22 14:45:21.885|
|106  |2019-01-22 14:45:23.885|106  |2019-01-22 14:45:23.885|
|100  |2019-01-22 14:45:17.885|100  |2019-01-22 14:45:17.885|
|108  |2019-01-22 14:45:25.885|108  |2019-01-22 14:45:25.885|
|61   |2019-01-22 14:44:38.885|null |null                   |
|3    |2019-01-22 14:43:40.885|null |null                   |
|37   |2019-01-22 14:44:14.885|null |null                   |
|55   |2019-01-22 14:44:32.885|null |null                   |
|49   |2019-01-22 14:44:26.885|null |null                   |
|11   |2019-01-22 14:43:48.885|null |null                   |
|35   |2019-01-22 14:44:12.885|null |null                   |
|80   |2019-01-22 14:44:57.885|80   |2019-01-22 14:44:57.885|
|76   |2019-01-22 14:44:53.885|76   |2019-01-22 14:44:53.885|
|102  |2019-01-22 14:45:19.885|102  |2019-01-22 14:45:19.885|
|92   |2019-01-22 14:45:09.885|92   |2019-01-22 14:45:09.885|
|13   |2019-01-22 14:43:50.885|null |null                   |
|78   |2019-01-22 14:44:55.885|78   |2019-01-22 14:44:55.885|
|21   |2019-01-22 14:43:58.885|null |null                   |
|59   |2019-01-22 14:44:36.885|null |null                   |
|15   |2019-01-22 14:43:52.885|null |null                   |
|82   |2019-01-22 14:44:59.885|82   |2019-01-22 14:44:59.885|
|90   |2019-01-22 14:45:07.885|90   |2019-01-22 14:45:07.885|
|23   |2019-01-22 14:44:00.885|null |null                   |
|86   |2019-01-22 14:45:03.885|86   |2019-01-22 14:45:03.885|
|53   |2019-01-22 14:44:30.885|null |null                   |
|47   |2019-01-22 14:44:24.885|null |null                   |
|45   |2019-01-22 14:44:22.885|null |null                   |
+-----+-----------------------+-----+-----------------------+


-------------------------------------------
Batch: 3
-------------------------------------------
+-----+-----------------------+-----+-----------------------+
|fooId|fooTime                |barId|barTime                |
+-----+-----------------------+-----+-----------------------+
|65   |2019-01-22 14:44:42.885|null |null                   |
|130  |2019-01-22 14:45:47.885|130  |2019-01-22 14:45:47.885|
|77   |2019-01-22 14:44:54.885|null |null                   |
|126  |2019-01-22 14:45:43.885|126  |2019-01-22 14:45:43.885|
|136  |2019-01-22 14:45:53.885|136  |2019-01-22 14:45:53.885|
|144  |2019-01-22 14:46:01.885|144  |2019-01-22 14:46:01.885|
|116  |2019-01-22 14:45:33.885|116  |2019-01-22 14:45:33.885|
|124  |2019-01-22 14:45:41.885|124  |2019-01-22 14:45:41.885|
|95   |2019-01-22 14:45:12.885|null |null                   |
|71   |2019-01-22 14:44:48.885|null |null                   |
|68   |2019-01-22 14:44:45.885|null |null                   |
|87   |2019-01-22 14:45:04.885|null |null                   |
|150  |2019-01-22 14:46:07.885|150  |2019-01-22 14:46:07.885|
|146  |2019-01-22 14:46:03.885|146  |2019-01-22 14:46:03.885|
|79   |2019-01-22 14:44:56.885|null |null                   |
|103  |2019-01-22 14:45:20.885|null |null                   |
|138  |2019-01-22 14:45:55.885|138  |2019-01-22 14:45:55.885|
|120  |2019-01-22 14:45:37.885|120  |2019-01-22 14:45:37.885|
|89   |2019-01-22 14:45:06.885|null |null                   |
|128  |2019-01-22 14:45:45.885|128  |2019-01-22 14:45:45.885|
|85   |2019-01-22 14:45:02.885|null |null                   |
|67   |2019-01-22 14:44:44.885|null |null                   |
|142  |2019-01-22 14:45:59.885|142  |2019-01-22 14:45:59.885|
|132  |2019-01-22 14:45:49.885|132  |2019-01-22 14:45:49.885|
|101  |2019-01-22 14:45:18.885|null |null                   |
|83   |2019-01-22 14:45:00.885|null |null                   |
|154  |2019-01-22 14:46:11.885|154  |2019-01-22 14:46:11.885|
|66   |2019-01-22 14:44:43.885|null |null                   |
|134  |2019-01-22 14:45:51.885|134  |2019-01-22 14:45:51.885|
|118  |2019-01-22 14:45:35.885|118  |2019-01-22 14:45:35.885|
|75   |2019-01-22 14:44:52.885|null |null                   |
|140  |2019-01-22 14:45:57.885|140  |2019-01-22 14:45:57.885|
|69   |2019-01-22 14:44:46.885|null |null                   |
|122  |2019-01-22 14:45:39.885|122  |2019-01-22 14:45:39.885|
|81   |2019-01-22 14:44:58.885|null |null                   |
|97   |2019-01-22 14:45:14.885|null |null                   |
|73   |2019-01-22 14:44:50.885|null |null                   |
|99   |2019-01-22 14:45:16.885|null |null                   |
|93   |2019-01-22 14:45:10.885|null |null                   |
|64   |2019-01-22 14:44:41.885|null |null                   |
|152  |2019-01-22 14:46:09.885|152  |2019-01-22 14:46:09.885|
|91   |2019-01-22 14:45:08.885|null |null                   |
|148  |2019-01-22 14:46:05.885|148  |2019-01-22 14:46:05.885|
+-----+-----------------------+-----+-----------------------+
{noformat}

fooId 64, 68 is matched against barId 64, 68 in batch 1, but also emitted with 
null matching in batch 3.

> Stream-stream left outer join returns outer nulls for already matched rows
> --------------------------------------------------------------------------
>
>                 Key: SPARK-26187
>                 URL: https://issues.apache.org/jira/browse/SPARK-26187
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.3.2
>            Reporter: Pavel Chernikov
>            Priority: Major
>
> This is basically the same issue as SPARK-26154, but with slightly easier 
> reproducible and concrete example:
> {code:java}
> val rateStream = session.readStream
>  .format("rate")
>  .option("rowsPerSecond", 1)
>  .option("numPartitions", 1)
>  .load()
> import org.apache.spark.sql.functions._
> val fooStream = rateStream
>  .select(col("value").as("fooId"), col("timestamp").as("fooTime"))
> val barStream = rateStream
>  // Introduce misses for ease of debugging
>  .where(col("value") % 2 === 0)
>  .select(col("value").as("barId"), col("timestamp").as("barTime")){code}
> If barStream is configured to happen earlier than fooStream, based on time 
> range condition, than everything is all right, no previously matched records 
> are flushed with outer NULLs:
> {code:java}
> val query = fooStream
>  .withWatermark("fooTime", "5 seconds")
>  .join(
>    barStream.withWatermark("barTime", "5 seconds"),
>    expr("""
>      barId = fooId AND
>      fooTime >= barTime AND
>      fooTime <= barTime + interval 5 seconds
>         """),
>    joinType = "leftOuter"
>  )
>  .writeStream
>  .format("console")
>  .option("truncate", false)
>  .start(){code}
> It's easy to observe that only odd rows are flushed with NULLs on the right:
> {code:java}
> [info] Batch: 1 
> [info] +-----+-----------------------+-----+-----------------------+ 
> [info] |fooId|fooTime                |barId|barTime                | 
> [info] +-----+-----------------------+-----+-----------------------+ 
> [info] |0    |2018-11-27 13:12:34.976|0    |2018-11-27 13:12:34.976| 
> [info] |6    |2018-11-27 13:12:40.976|6    |2018-11-27 13:12:40.976| 
> [info] |10   |2018-11-27 13:12:44.976|10   |2018-11-27 13:12:44.976| 
> [info] |8    |2018-11-27 13:12:42.976|8    |2018-11-27 13:12:42.976| 
> [info] |2    |2018-11-27 13:12:36.976|2    |2018-11-27 13:12:36.976| 
> [info] |4    |2018-11-27 13:12:38.976|4    |2018-11-27 13:12:38.976| 
> [info] +-----+-----------------------+-----+-----------------------+ 
> [info] Batch: 2 
> [info] +-----+-----------------------+-----+-----------------------+ 
> [info] |fooId|fooTime                |barId|barTime                | 
> [info] +-----+-----------------------+-----+-----------------------+ 
> [info] |1    |2018-11-27 13:12:35.976|null |null                   | 
> [info] |3    |2018-11-27 13:12:37.976|null |null                   | 
> [info] |12   |2018-11-27 13:12:46.976|12   |2018-11-27 13:12:46.976| 
> [info] |18   |2018-11-27 13:12:52.976|18   |2018-11-27 13:12:52.976| 
> [info] |14   |2018-11-27 13:12:48.976|14   |2018-11-27 13:12:48.976| 
> [info] |20   |2018-11-27 13:12:54.976|20   |2018-11-27 13:12:54.976| 
> [info] |16   |2018-11-27 13:12:50.976|16   |2018-11-27 13:12:50.976| 
> [info] +-----+-----------------------+-----+-----------------------+ 
> [info] Batch: 3 
> [info] +-----+-----------------------+-----+-----------------------+ 
> [info] |fooId|fooTime                |barId|barTime                | 
> [info] +-----+-----------------------+-----+-----------------------+ 
> [info] |26   |2018-11-27 13:13:00.976|26   |2018-11-27 13:13:00.976| 
> [info] |22   |2018-11-27 13:12:56.976|22   |2018-11-27 13:12:56.976| 
> [info] |7    |2018-11-27 13:12:41.976|null |null                   | 
> [info] |9    |2018-11-27 13:12:43.976|null |null                   | 
> [info] |28   |2018-11-27 13:13:02.976|28   |2018-11-27 13:13:02.976| 
> [info] |5    |2018-11-27 13:12:39.976|null |null                   | 
> [info] |11   |2018-11-27 13:12:45.976|null |null                   | 
> [info] |13   |2018-11-27 13:12:47.976|null |null                   | 
> [info] |24   |2018-11-27 13:12:58.976|24   |2018-11-27 13:12:58.976| 
> [info] +-----+-----------------------+-----+-----------------------+
> {code}
> On the other hand, if we switch the ordering and now fooStream is happening 
> earlier based on time range condition:
> {code:java}
> val query = fooStream
>  .withWatermark("fooTime", "5 seconds")
>  .join(
>    barStream.withWatermark("barTime", "5 seconds"),
>    expr("""
>      barId = fooId AND
>      barTime >= fooTime AND
>      barTime <= fooTime + interval 5 seconds
>         """),
>    joinType = "leftOuter"
>  )
>  .writeStream
>  .format("console")
>  .option("truncate", false)
>  .start(){code}
> Some, not all, previously matched records (with even IDs) are omitted with 
> outer NULLs along with all unmatched records (with odd IDs): 
> {code:java}
> [info] Batch: 1 
> [info] +-----+-----------------------+-----+-----------------------+ 
> [info] |fooId|fooTime                |barId|barTime                | 
> [info] +-----+-----------------------+-----+-----------------------+
> [info] |0    |2018-11-27 13:26:11.463|0    |2018-11-27 13:26:11.463| 
> [info] |6    |2018-11-27 13:26:17.463|6    |2018-11-27 13:26:17.463| 
> [info] |10   |2018-11-27 13:26:21.463|10   |2018-11-27 13:26:21.463| 
> [info] |8    |2018-11-27 13:26:19.463|8    |2018-11-27 13:26:19.463| 
> [info] |2    |2018-11-27 13:26:13.463|2    |2018-11-27 13:26:13.463| 
> [info] |4    |2018-11-27 13:26:15.463|4    |2018-11-27 13:26:15.463| 
> [info] +-----+-----------------------+-----+-----------------------+ 
> [info] Batch: 2 
> [info] +-----+-----------------------+-----+-----------------------+ 
> [info] |fooId|fooTime                |barId|barTime                | 
> [info] +-----+-----------------------+-----+-----------------------+ 
> [info] |12   |2018-11-27 13:26:23.463|12   |2018-11-27 13:26:23.463| 
> [info] |18   |2018-11-27 13:26:29.463|18   |2018-11-27 13:26:29.463| 
> [info] |14   |2018-11-27 13:26:25.463|14   |2018-11-27 13:26:25.463| 
> [info] |20   |2018-11-27 13:26:31.463|20   |2018-11-27 13:26:31.463| 
> [info] |16   |2018-11-27 13:26:27.463|16   |2018-11-27 13:26:27.463| 
> [info] +-----+-----------------------+-----+-----------------------+ 
> [info] Batch: 3
> [info] +-----+-----------------------+-----+-----------------------+ 
> [info] |fooId|fooTime                |barId|barTime                | 
> [info] +-----+-----------------------+-----+-----------------------+ 
> [info] |26   |2018-11-27 13:26:37.463|26   |2018-11-27 13:26:37.463| 
> [info] |0    |2018-11-27 13:26:11.463|null |null                   | 
> [info] |22   |2018-11-27 13:26:33.463|22   |2018-11-27 13:26:33.463| 
> [info] |7    |2018-11-27 13:26:18.463|null |null                   | 
> [info] |9    |2018-11-27 13:26:20.463|null |null                   | 
> [info] |28   |2018-11-27 13:26:39.463|28   |2018-11-27 13:26:39.463| 
> [info] |5    |2018-11-27 13:26:16.463|null |null                   | 
> [info] |1    |2018-11-27 13:26:12.463|null |null                   | 
> [info] |3    |2018-11-27 13:26:14.463|null |null                   | 
> [info] |2    |2018-11-27 13:26:13.463|null |null                   | 
> [info] |4    |2018-11-27 13:26:15.463|null |null                   | 
> [info] |30   |2018-11-27 13:26:41.463|30   |2018-11-27 13:26:41.463| 
> [info] |24   |2018-11-27 13:26:35.463|24   |2018-11-27 13:26:35.463| 
> [info] +-----+-----------------------+-----+-----------------------+
> [info] Batch: 4 
> [info] +-----+-----------------------+-----+-----------------------+
> [info] |fooId|fooTime                |barId|barTime                | 
> [info] +-----+-----------------------+-----+-----------------------+ 
> [info] |19   |2018-11-27 13:26:30.463|null |null                   | 
> [info] |34   |2018-11-27 13:26:45.463|34   |2018-11-27 13:26:45.463|
> [info] |32   |2018-11-27 13:26:43.463|32   |2018-11-27 13:26:43.463|
> [info] |17   |2018-11-27 13:26:28.463|null |null                   |
> [info] |10   |2018-11-27 13:26:21.463|null |null                   |
> [info] |12   |2018-11-27 13:26:23.463|null |null                   |
> [info] |11   |2018-11-27 13:26:22.463|null |null                   |
> [info] |13   |2018-11-27 13:26:24.463|null |null                   |
> [info] |36   |2018-11-27 13:26:47.463|36   |2018-11-27 13:26:47.463|
> [info] |14   |2018-11-27 13:26:25.463|null |null                   |
> [info] |15   |2018-11-27 13:26:26.463|null |null                   |
> [info] |38   |2018-11-27 13:26:49.463|38   |2018-11-27 13:26:49.463|
> [info] |40   |2018-11-27 13:26:51.463|40   |2018-11-27 13:26:51.463|
> [info] +-----+-----------------------+-----+-----------------------+
> {code}
> h6.  
> h6.  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to