This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new f5f7a471321 Merge pull request #26284: Fix GroupIntoBatches hold f5f7a471321 is described below commit f5f7a471321c903174b452a1982c5183a79ac6bc Author: Reuven Lax <re...@google.com> AuthorDate: Fri Apr 14 19:24:25 2023 -0700 Merge pull request #26284: Fix GroupIntoBatches hold --- .../java/org/apache/beam/sdk/transforms/GroupIntoBatches.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java index 311a3dac6ca..3616cc2e59f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java @@ -580,6 +580,7 @@ public class GroupIntoBatches<K, InputT> timerTs, minBufferedTs); bufferingTimer.clear(); + holdTimer.clear(); } } @@ -593,13 +594,18 @@ public class GroupIntoBatches<K, InputT> @StateId(NUM_BYTES_IN_BATCH_ID) CombiningState<Long, long[], Long> storedBatchSizeBytes, @StateId(TIMER_TIMESTAMP) ValueState<Long> timerTs, @StateId(MIN_BUFFERED_TS) CombiningState<Long, long[], Long> minBufferedTs, - @TimerId(END_OF_BUFFERING_ID) Timer bufferingTimer) { + @TimerId(END_OF_BUFFERING_ID) Timer bufferingTimer, + @TimerId(TIMER_HOLD_ID) Timer holdTimer) { LOG.debug( "*** END OF BUFFERING *** for timer timestamp {} with buffering duration {}", timestamp, maxBufferingDuration); flushBatch( receiver, key, batch, storedBatchSize, storedBatchSizeBytes, timerTs, minBufferedTs); + // Generally this is a noop, since holdTimer is not set if bufferingTimer is set. However we + // delete the holdTimer + // here in order to allow users to modify this policy on pipeline update. + holdTimer.clear(); } @OnWindowExpiration