Hi, I am facing this issue too. +dev <dev@beam.apache.org> Here is the Pipeline that we are using(providing a very simple pipeline to highlight the issue): KafkaSource -> SqlTransform -> KafkaSink
We are reading from a single topic in KafkaSource with a single partition. Here is the data that we are producing to KafkaSource topic: "str1", "2019-09-10 11:36:42" "str2", "2019-09-10 11:36:44" "str3", "2019-09-10 11:36:45" The first column name is "strCol". The second column, i.e. the timestamp in string format is being used as the timestamp of the element. The timestamp is the wall time when the record got generated. After publishing this data to the Kafka topic, we are not publishing any more data. The topic is idle after that. The timestamps of the records are monotonically increasing. Sql query: "select strCol FROM PCOLLECTION GROUP BY strCol" Here is the result from KafkaSink: {"strCol":{"string":"str1"}} {"strCol":{"string":"str3"}} {"strCol":{"string":"str2"}} The expected result is written to KafkaSink Correctly, *but with a delay*. Here are the logs from Spark Driver: ... 19/09/10 12:12:42 INFO GlobalWatermarkHolder: Put new watermark block: {0=SparkWatermarks{lowWatermark=2019-09-10T11:43:37.273Z, highWatermark=2019-09-10T11:43:37.273Z, synchronizedProcessingTime=2019-09-10T11:40:33.000Z}} ... 19/09/10 12:18:53 INFO GlobalWatermarkHolder: Put new watermark block: {0=SparkWatermarks{lowWatermark=2019-09-10T11:44:54.238Z, highWatermark=2019-09-10T11:44:54.238Z, synchronizedProcessingTime=2019-09-10T11:41:17.000Z}} As per the logs, when the processing time was 12:12:42, the highWatermark was at 11:43:37. Almost 30 minutes delay. And when the processing time was 12:18:53, the highWatermark was at 11:44:54. >From the above logs, it seems that the watermark is moving slowly. Is there an IT for SparkRunner with Unbounded data and Windowing aggregation? Is this a known bug? Thanks, Rahul On Thu, Sep 5, 2019 at 7:48 PM shanta chakpram <shantachakp...@gmail.com> wrote: > Hi, > > We have detected an issue with SparkRunner and Watermark. > > *Pipeline*: Read from two Kafka Sources => Apply fixed window of duration > 1 minute to both the PCollections => Apply SqlTransform with query "select > c.datetime, c.country ,s.name, s.id from `kafka_source1` as s join > `kafka_source2` as c on s.name = c.name" => write the emitted output to > Kafka Sink > > we are using the watermark provided in > https://github.com/apache/beam/blob/8869fcebdb9ddff375d8e7b408f1d971e1257815/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/CustomTimestampPolicyWithLimitedDelay.java#L74. > We have given maxDelay as 0. > > As we have applied fixed window of 1 minute duration and as the elements > timestamps are monotonically increasing, we are expecting the output to be > emitted when the current processing time crosses 12-02-00 with a reasonable > delay(say 10 seconds). But, we are getting the result of the window after a > long delay. > > In Spark logs it seems that the watermark is lagging. > Here are the logs: > 19/09/05 12:02:50 INFO GlobalWatermarkHolder: Put new watermark block: > {0=SparkWatermarks{lowWatermark=2019-09-05T11:57:05.558Z, > highWatermark=2019-09-05T11:57:06.302Z, > synchronizedProcessingTime=2019-09-05T11:55:00.500Z}, > 1=SparkWatermarks{lowWatermark=2019-09-05T11:57:05.120Z, > highWatermark=2019-09-05T11:57:06.686Z, > synchronizedProcessingTime=2019-09-05T11:55:00.500Z}} > 19/09/05 12:02:50 INFO > GlobalWatermarkHolder$WatermarkAdvancingStreamingListener: Batch with > timestamp: 1567684500500 has completed, watermarks have been updated. > > As you can see, when the current processing time is 12:02:50, the > highWatermark is 11:57:06. > As the processing time progresses, the gap between processing time and > highWatermark is increasing. > > We ran the same pipeline with same data in Flink Runner and Direct Runner > and we have not seen this issue. In these runners, we can see that the > Watermark is almost equal to Processing time. > > Sample Input Data : > > kafka_source1: > value:{'id': 1, 'name': 'test0', 'datetime': '2019-09-05 12-01-19 481704'} > value:{'id': 1, 'name': 'test1', 'datetime': '2019-09-05 12-01-20 491764'} > value:{'id': 1, 'name': 'test0', 'datetime': '2019-09-05 12-01-21 493494'} > > kafka_source2: > value:{'country': 'India', 'name': 'test0', 'datetime': '2019-09-05 > 12-01-26 704060'} > value:{'country': 'USA', 'name': 'test1', 'datetime': '2019-09-05 12-01-27 > 712300'} > value:{'country': 'USA', 'name': 'test2', 'datetime': '2019-09-05 12-01-28 > 713951'} > > what can be the issue here? > > Regards, > shanta >