This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 1591d3c Merge pull request #15249: [BEAM-12690] Fix GroupIntoBatches watermark maintenance 1591d3c is described below commit 1591d3cb3ee354fe53c31946d296eb890602b41d Author: reuvenlax <re...@google.com> AuthorDate: Fri Aug 6 13:30:20 2021 -0700 Merge pull request #15249: [BEAM-12690] Fix GroupIntoBatches watermark maintenance --- .../beam/sdk/transforms/GroupIntoBatches.java | 121 +++++++++++++++++---- 1 file changed, 99 insertions(+), 22 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java index f8840b5..f5fbf5d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java @@ -35,12 +35,14 @@ import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.state.Timer; import org.apache.beam.sdk.state.TimerSpec; import org.apache.beam.sdk.state.TimerSpecs; +import org.apache.beam.sdk.state.ValueState; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.ShardedKey; import org.apache.beam.sdk.util.common.ElementByteSizeObserver; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables; import org.joda.time.Duration; @@ -320,16 +322,19 @@ public class GroupIntoBatches<K, InputT> @TimerId(END_OF_WINDOW_ID) private final TimerSpec windowTimer = TimerSpecs.timer(TimeDomain.EVENT_TIME); + // This timer expires when it's time to batch and output the buffered data. private static final String END_OF_BUFFERING_ID = "endOfBuffering"; @TimerId(END_OF_BUFFERING_ID) private final TimerSpec bufferingTimer = TimerSpecs.timer(TimeDomain.PROCESSING_TIME); + // The set of elements that will go in the next batch. private static final String BATCH_ID = "batch"; @StateId(BATCH_ID) private final StateSpec<BagState<InputT>> batchSpec; + // The size of the current batch. private static final String NUM_ELEMENTS_IN_BATCH_ID = "numElementsInBatch"; @StateId(NUM_ELEMENTS_IN_BATCH_ID) @@ -337,9 +342,24 @@ public class GroupIntoBatches<K, InputT> private static final String NUM_BYTES_IN_BATCH_ID = "numBytesInBatch"; + // The byte size of the current batch. @StateId(NUM_BYTES_IN_BATCH_ID) private final StateSpec<CombiningState<Long, long[], Long>> batchSizeBytesSpec; + private static final String TIMER_TIMESTAMP = "timerTs"; + + // The timestamp of the current active timer. + @StateId(TIMER_TIMESTAMP) + private final StateSpec<ValueState<Long>> timerTsSpec; + + // The minimum element timestamp currently buffered in the bag. This is used to set the output + // timestamp + // on the timer which ensures that the watermark correctly tracks the buffered elements. + private static final String MIN_BUFFERED_TS = "minBufferedTs"; + + @StateId(MIN_BUFFERED_TS) + private final StateSpec<CombiningState<Long, long[], Long>> minBufferedTsSpec; + private final long prefetchFrequency; GroupIntoBatchesDoFn( @@ -369,8 +389,23 @@ public class GroupIntoBatches<K, InputT> } }; + Combine.BinaryCombineLongFn minCombineFn = + new Combine.BinaryCombineLongFn() { + @Override + public long identity() { + return BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis(); + } + + @Override + public long apply(long left, long right) { + return Math.min(left, right); + } + }; + this.batchSizeSpec = StateSpecs.combining(sumCombineFn); this.batchSizeBytesSpec = StateSpecs.combining(sumCombineFn); + this.timerTsSpec = StateSpecs.value(); + this.minBufferedTsSpec = StateSpecs.combining(minCombineFn); // Prefetch every 20% of batchSize elements. Do not prefetch if batchSize is too little this.prefetchFrequency = ((batchSize / 5) <= 1) ? Long.MAX_VALUE : (batchSize / 5); @@ -378,17 +413,16 @@ public class GroupIntoBatches<K, InputT> @ProcessElement public void processElement( - @TimerId(END_OF_WINDOW_ID) Timer windowTimer, @TimerId(END_OF_BUFFERING_ID) Timer bufferingTimer, @StateId(BATCH_ID) BagState<InputT> batch, @StateId(NUM_ELEMENTS_IN_BATCH_ID) CombiningState<Long, long[], Long> storedBatchSize, @StateId(NUM_BYTES_IN_BATCH_ID) CombiningState<Long, long[], Long> storedBatchSizeBytes, + @StateId(TIMER_TIMESTAMP) ValueState<Long> timerTs, + @StateId(MIN_BUFFERED_TS) CombiningState<Long, long[], Long> minBufferedTs, @Element KV<K, InputT> element, + @Timestamp Instant elementTs, BoundedWindow window, OutputReceiver<KV<K, Iterable<InputT>>> receiver) { - Instant windowEnds = window.maxTimestamp().plus(allowedLateness); - LOG.debug("*** SET TIMER *** to point in time {} for window {}", windowEnds, window); - windowTimer.set(windowEnds); LOG.debug("*** BATCH *** Add element for window {} ", window); batch.add(element.getValue()); // Blind add is supported with combiningState @@ -398,11 +432,31 @@ public class GroupIntoBatches<K, InputT> storedBatchSizeBytes.readLater(); } - long num = storedBatchSize.read(); - if (maxBufferingDuration.isLongerThan(Duration.ZERO) && num == 1) { - // This is the first element in batch. Start counting buffering time if a limit was set. - bufferingTimer.offset(maxBufferingDuration).setRelative(); + long num; + if (maxBufferingDuration.isLongerThan(Duration.ZERO)) { + minBufferedTs.readLater(); + num = storedBatchSize.read(); + + long oldOutputTs = + MoreObjects.firstNonNull( + minBufferedTs.read(), BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()); + minBufferedTs.add(elementTs.getMillis()); + // If this is the first element in the batch or if the timer's output timestamp needs + // modifying, then set a + // timer. + if (num == 1 || minBufferedTs.read() != oldOutputTs) { + long targetTs = + MoreObjects.firstNonNull( + timerTs.read(), + bufferingTimer.getCurrentRelativeTime().getMillis() + + maxBufferingDuration.getMillis()); + bufferingTimer + .withOutputTimestamp(Instant.ofEpochMilli(minBufferedTs.read())) + .set(Instant.ofEpochMilli(targetTs)); + } } + num = storedBatchSize.read(); + if (num % prefetchFrequency == 0) { // Prefetch data and modify batch state (readLater() modifies this) batch.readLater(); @@ -411,14 +465,15 @@ public class GroupIntoBatches<K, InputT> if (num >= batchSize || (batchSizeBytes != Long.MAX_VALUE && storedBatchSizeBytes.read() >= batchSizeBytes)) { LOG.debug("*** END OF BATCH *** for window {}", window.toString()); - flushBatch(receiver, element.getKey(), batch, storedBatchSize, storedBatchSizeBytes); - // Reset the buffering timer (if not null) since the state is empty now and we want to - // release the watermark. It'll be extended again if a - // new element arrives prior to the expiration time set here. - // TODO(BEAM-10887): Use clear() when it's available. - if (maxBufferingDuration.isLongerThan(Duration.ZERO)) { - bufferingTimer.offset(maxBufferingDuration).setRelative(); - } + flushBatch( + receiver, + element.getKey(), + batch, + storedBatchSize, + storedBatchSizeBytes, + timerTs, + minBufferedTs); + bufferingTimer.clear(); } } @@ -430,12 +485,15 @@ public class GroupIntoBatches<K, InputT> @StateId(BATCH_ID) BagState<InputT> batch, @StateId(NUM_ELEMENTS_IN_BATCH_ID) CombiningState<Long, long[], Long> storedBatchSize, @StateId(NUM_BYTES_IN_BATCH_ID) CombiningState<Long, long[], Long> storedBatchSizeBytes, + @StateId(TIMER_TIMESTAMP) ValueState<Long> timerTs, + @StateId(MIN_BUFFERED_TS) CombiningState<Long, long[], Long> minBufferedTs, @TimerId(END_OF_BUFFERING_ID) Timer bufferingTimer) { LOG.debug( "*** END OF BUFFERING *** for timer timestamp {} with buffering duration {}", timestamp, maxBufferingDuration); - flushBatch(receiver, key, batch, storedBatchSize, storedBatchSizeBytes); + flushBatch( + receiver, key, batch, storedBatchSize, storedBatchSizeBytes, timerTs, minBufferedTs); } @OnWindowExpiration @@ -444,8 +502,11 @@ public class GroupIntoBatches<K, InputT> @Key K key, @StateId(BATCH_ID) BagState<InputT> batch, @StateId(NUM_ELEMENTS_IN_BATCH_ID) CombiningState<Long, long[], Long> storedBatchSize, - @StateId(NUM_BYTES_IN_BATCH_ID) CombiningState<Long, long[], Long> storedBatchSizeBytes) { - flushBatch(receiver, key, batch, storedBatchSize, storedBatchSizeBytes); + @StateId(NUM_BYTES_IN_BATCH_ID) CombiningState<Long, long[], Long> storedBatchSizeBytes, + @StateId(TIMER_TIMESTAMP) ValueState<Long> timerTs, + @StateId(MIN_BUFFERED_TS) CombiningState<Long, long[], Long> minBufferedTs) { + flushBatch( + receiver, key, batch, storedBatchSize, storedBatchSizeBytes, timerTs, minBufferedTs); } // We no longer set this timer, since OnWindowExpiration takes care of his. However we leave the @@ -459,12 +520,15 @@ public class GroupIntoBatches<K, InputT> @StateId(BATCH_ID) BagState<InputT> batch, @StateId(NUM_ELEMENTS_IN_BATCH_ID) CombiningState<Long, long[], Long> storedBatchSize, @StateId(NUM_BYTES_IN_BATCH_ID) CombiningState<Long, long[], Long> storedBatchSizeBytes, + @StateId(TIMER_TIMESTAMP) ValueState<Long> timerTs, + @StateId(MIN_BUFFERED_TS) CombiningState<Long, long[], Long> minBufferedTs, BoundedWindow window) { LOG.debug( "*** END OF WINDOW *** for timer timestamp {} in windows {}", timestamp, window.toString()); - flushBatch(receiver, key, batch, storedBatchSize, storedBatchSizeBytes); + flushBatch( + receiver, key, batch, storedBatchSize, storedBatchSizeBytes, timerTs, minBufferedTs); } private void flushBatch( @@ -472,16 +536,29 @@ public class GroupIntoBatches<K, InputT> K key, BagState<InputT> batch, CombiningState<Long, long[], Long> storedBatchSize, - CombiningState<Long, long[], Long> storedBatchSizeBytes) { + CombiningState<Long, long[], Long> storedBatchSizeBytes, + ValueState<Long> timerTs, + CombiningState<Long, long[], Long> minBufferedTs) { Iterable<InputT> values = batch.read(); // When the timer fires, batch state might be empty if (!Iterables.isEmpty(values)) { receiver.output(KV.of(key, values)); } + clearState(batch, storedBatchSize, storedBatchSizeBytes, timerTs, minBufferedTs); + ; + } + + private void clearState( + BagState<InputT> batch, + CombiningState<Long, long[], Long> storedBatchSize, + CombiningState<Long, long[], Long> storedBatchSizeBytes, + ValueState<Long> timerTs, + CombiningState<Long, long[], Long> minBufferedTs) { batch.clear(); - LOG.debug("*** BATCH *** clear"); storedBatchSize.clear(); storedBatchSizeBytes.clear(); + timerTs.clear(); + minBufferedTs.clear(); } } }