Hi,

We have detected an issue with SparkRunner and Watermark.

*Pipeline*: Read from two Kafka Sources => Apply fixed window of duration 1
minute to both the PCollections => Apply SqlTransform with query "select
c.datetime, c.country ,s.name, s.id from `kafka_source1` as s join
`kafka_source2` as c on s.name = c.name" => write the emitted output to
Kafka Sink

we are using the watermark provided in
https://github.com/apache/beam/blob/8869fcebdb9ddff375d8e7b408f1d971e1257815/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/CustomTimestampPolicyWithLimitedDelay.java#L74.
We have given maxDelay as 0.

As we have applied fixed window of 1 minute duration and as the elements
timestamps are monotonically increasing, we are expecting the output to be
emitted when the current processing time crosses 12-02-00 with a reasonable
delay(say 10 seconds). But, we are getting the result of the window after a
long delay.

In Spark logs it seems that the watermark is lagging.
Here are the logs:
19/09/05 12:02:50 INFO GlobalWatermarkHolder: Put new watermark block:
{0=SparkWatermarks{lowWatermark=2019-09-05T11:57:05.558Z,
highWatermark=2019-09-05T11:57:06.302Z,
synchronizedProcessingTime=2019-09-05T11:55:00.500Z},
1=SparkWatermarks{lowWatermark=2019-09-05T11:57:05.120Z,
highWatermark=2019-09-05T11:57:06.686Z,
synchronizedProcessingTime=2019-09-05T11:55:00.500Z}}
19/09/05 12:02:50 INFO
GlobalWatermarkHolder$WatermarkAdvancingStreamingListener: Batch with
timestamp: 1567684500500 has completed, watermarks have been updated.

As you can see, when the current processing time is 12:02:50, the
highWatermark is 11:57:06.
As the processing time progresses, the gap between processing time and
highWatermark is increasing.

We ran the same pipeline with same data in Flink Runner and Direct Runner
and we have not seen this issue. In these runners, we can see that the
Watermark is almost equal to Processing time.

Sample Input Data :

kafka_source1:
value:{'id': 1, 'name': 'test0', 'datetime': '2019-09-05 12-01-19 481704'}
value:{'id': 1, 'name': 'test1', 'datetime': '2019-09-05 12-01-20 491764'}
value:{'id': 1, 'name': 'test0', 'datetime': '2019-09-05 12-01-21 493494'}

kafka_source2:
value:{'country': 'India', 'name': 'test0', 'datetime': '2019-09-05
12-01-26 704060'}
value:{'country': 'USA', 'name': 'test1', 'datetime': '2019-09-05 12-01-27
712300'}
value:{'country': 'USA', 'name': 'test2', 'datetime': '2019-09-05 12-01-28
713951'}

what can be the issue here?

Regards,
shanta

Reply via email to