Hi, *Usage Info*: We are using Beam: 2.16.0, Spark: 2.4.2 We are running Spark on Kubernetes. We are using Spark Streaming(legacy) Runner with Beam Java SDK The Pipeline has been run with default configurations i.e. default configurations for SparkPipelineOptions.
*Issue*: When a Beam Pipeline is submitted to Spark, there is latency in getting the results of an Aggregation. This latency is increasing with every new Window. The Pipeline is run with 1-minute fixed windows and default trigger(When Watermark crosses the end of window). The same pipeline is working as expected in Flink and Direct runners. *Initial Analysis*: The Watermarks for a batch(500ms[default]) are broadcasted when onBatchCompleted()[1] event for the batch is triggered. We are observing latency between the time when the batch is completed and the time when onBatchCompleted() event is triggered for the batch. For Example, if the batch is completed at 09:29:30(HH:MM:SS), the onbatchCompleted() event for the batch is triggered at 09:29:35. As you can see, there is a 5-second delay in this example. We came to know about the time when the batch is completed from Spark Driver UI and Driver logs. I have asked a question about this latency in Spark User mailing list[2] and waiting for a response. The Watermarks for each batch are added to a queue. When onBatchCompleted() event is triggered for any batch, the watermarks are polled from the queue and broadcasted[3]. As there is a delay between the batch Completion and onBatchCompleted() event getting triggered for the batch, there is a delay in advancing the Watermarks, because of which there is a latency in emitting results after the aggregation. As the Pipeline progresses, because of this latency, the rate at which watermarks are added to the queue is high compared to the rate at which the Watermarks are polled from the queue and broadcasted. And as the Pipeline progresses, the latency between Batch Completion and onBatchCompleted() event getting triggered is increasing. *Logs*: These are the trimmed logs which show the issue in action: *11:37:06*: INFO: scheduler.JobScheduler: "Finished job streaming job *1584099237500* ms.4 from job set of time 1584099237500 ms" *11:37:06*: INFO: scheduler.JobScheduler: "Total delay: 189.352 s for time *1584099237500* ms (execution: 0.942 s)" *11:40:30*: INFO: util.GlobalWatermarkHolder: "Put new watermark block: {0=SparkWatermarks{lowWatermark=2020-03-13T11:37:03.621Z, highWatermark=2020-03-13T11:37:04.882Z, synchronizedProcessingTime=2020-03-13T11:33:57.500Z}, 1=SparkWatermarks{lowWatermark=2020-03-13T11:37:03.872Z, highWatermark=2020-03-13T11:37:05.107Z, synchronizedProcessingTime=2020-03-13T11:33:57.500Z}, 2=SparkWatermarks{lowWatermark=2020-03-13T11:37:04.204Z, highWatermark=2020-03-13T11:37:05.377Z, synchronizedProcessingTime=2020-03-13T11:33:57.500Z}}" *11:40:30*: INFO: util.GlobalWatermarkHolder$WatermarkAdvancingStreamingListener: "Batch with timestamp: *1584099237500* has completed, watermarks have been updated." As you can see, there is almost *3 minutes 24 seconds delay* between the time when the batch is completed and the time when onBatchCompleted() event for the batch is triggered(Watermarks are advanced). Did anyone face this issue before? What are the factors that can contribute to this latency? Thanks for any pointers to debug the issue. [1] - https://github.com/apache/beam/blob/de30361359b70e9fe9729f0f3d52f6c6e8462cfb/runners/spark/src/main/java/org/apache/beam/runners/spark/util/GlobalWatermarkHolder.java#L363 [2] - http://apache-spark-user-list.1001560.n3.nabble.com/Latency-between-Batch-Completion-and-triggering-of-onBatchCompleted-event-tc37086.html [3] - https://github.com/apache/beam/blob/de30361359b70e9fe9729f0f3d52f6c6e8462cfb/runners/spark/src/main/java/org/apache/beam/runners/spark/util/GlobalWatermarkHolder.java#L208 Regards, Rahul