This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 1591d3c  Merge pull request #15249: [BEAM-12690] Fix GroupIntoBatches 
watermark maintenance
1591d3c is described below

commit 1591d3cb3ee354fe53c31946d296eb890602b41d
Author: reuvenlax <re...@google.com>
AuthorDate: Fri Aug 6 13:30:20 2021 -0700

    Merge pull request #15249: [BEAM-12690] Fix GroupIntoBatches watermark 
maintenance
---
 .../beam/sdk/transforms/GroupIntoBatches.java      | 121 +++++++++++++++++----
 1 file changed, 99 insertions(+), 22 deletions(-)

diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
index f8840b5..f5fbf5d 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
@@ -35,12 +35,14 @@ import org.apache.beam.sdk.state.TimeDomain;
 import org.apache.beam.sdk.state.Timer;
 import org.apache.beam.sdk.state.TimerSpec;
 import org.apache.beam.sdk.state.TimerSpecs;
+import org.apache.beam.sdk.state.ValueState;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.ShardedKey;
 import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
 import org.joda.time.Duration;
@@ -320,16 +322,19 @@ public class GroupIntoBatches<K, InputT>
     @TimerId(END_OF_WINDOW_ID)
     private final TimerSpec windowTimer = 
TimerSpecs.timer(TimeDomain.EVENT_TIME);
 
+    // This timer expires when it's time to batch and output the buffered data.
     private static final String END_OF_BUFFERING_ID = "endOfBuffering";
 
     @TimerId(END_OF_BUFFERING_ID)
     private final TimerSpec bufferingTimer = 
TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
 
+    // The set of elements that will go in the next batch.
     private static final String BATCH_ID = "batch";
 
     @StateId(BATCH_ID)
     private final StateSpec<BagState<InputT>> batchSpec;
 
+    // The size of the current batch.
     private static final String NUM_ELEMENTS_IN_BATCH_ID = 
"numElementsInBatch";
 
     @StateId(NUM_ELEMENTS_IN_BATCH_ID)
@@ -337,9 +342,24 @@ public class GroupIntoBatches<K, InputT>
 
     private static final String NUM_BYTES_IN_BATCH_ID = "numBytesInBatch";
 
+    // The byte size of the current batch.
     @StateId(NUM_BYTES_IN_BATCH_ID)
     private final StateSpec<CombiningState<Long, long[], Long>> 
batchSizeBytesSpec;
 
+    private static final String TIMER_TIMESTAMP = "timerTs";
+
+    // The timestamp of the current active timer.
+    @StateId(TIMER_TIMESTAMP)
+    private final StateSpec<ValueState<Long>> timerTsSpec;
+
+    // The minimum element timestamp currently buffered in the bag. This is 
used to set the output
+    // timestamp
+    // on the timer which ensures that the watermark correctly tracks the 
buffered elements.
+    private static final String MIN_BUFFERED_TS = "minBufferedTs";
+
+    @StateId(MIN_BUFFERED_TS)
+    private final StateSpec<CombiningState<Long, long[], Long>> 
minBufferedTsSpec;
+
     private final long prefetchFrequency;
 
     GroupIntoBatchesDoFn(
@@ -369,8 +389,23 @@ public class GroupIntoBatches<K, InputT>
             }
           };
 
+      Combine.BinaryCombineLongFn minCombineFn =
+          new Combine.BinaryCombineLongFn() {
+            @Override
+            public long identity() {
+              return BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis();
+            }
+
+            @Override
+            public long apply(long left, long right) {
+              return Math.min(left, right);
+            }
+          };
+
       this.batchSizeSpec = StateSpecs.combining(sumCombineFn);
       this.batchSizeBytesSpec = StateSpecs.combining(sumCombineFn);
+      this.timerTsSpec = StateSpecs.value();
+      this.minBufferedTsSpec = StateSpecs.combining(minCombineFn);
 
       // Prefetch every 20% of batchSize elements. Do not prefetch if 
batchSize is too little
       this.prefetchFrequency = ((batchSize / 5) <= 1) ? Long.MAX_VALUE : 
(batchSize / 5);
@@ -378,17 +413,16 @@ public class GroupIntoBatches<K, InputT>
 
     @ProcessElement
     public void processElement(
-        @TimerId(END_OF_WINDOW_ID) Timer windowTimer,
         @TimerId(END_OF_BUFFERING_ID) Timer bufferingTimer,
         @StateId(BATCH_ID) BagState<InputT> batch,
         @StateId(NUM_ELEMENTS_IN_BATCH_ID) CombiningState<Long, long[], Long> 
storedBatchSize,
         @StateId(NUM_BYTES_IN_BATCH_ID) CombiningState<Long, long[], Long> 
storedBatchSizeBytes,
+        @StateId(TIMER_TIMESTAMP) ValueState<Long> timerTs,
+        @StateId(MIN_BUFFERED_TS) CombiningState<Long, long[], Long> 
minBufferedTs,
         @Element KV<K, InputT> element,
+        @Timestamp Instant elementTs,
         BoundedWindow window,
         OutputReceiver<KV<K, Iterable<InputT>>> receiver) {
-      Instant windowEnds = window.maxTimestamp().plus(allowedLateness);
-      LOG.debug("*** SET TIMER *** to point in time {} for window {}", 
windowEnds, window);
-      windowTimer.set(windowEnds);
       LOG.debug("*** BATCH *** Add element for window {} ", window);
       batch.add(element.getValue());
       // Blind add is supported with combiningState
@@ -398,11 +432,31 @@ public class GroupIntoBatches<K, InputT>
         storedBatchSizeBytes.readLater();
       }
 
-      long num = storedBatchSize.read();
-      if (maxBufferingDuration.isLongerThan(Duration.ZERO) && num == 1) {
-        // This is the first element in batch. Start counting buffering time 
if a limit was set.
-        bufferingTimer.offset(maxBufferingDuration).setRelative();
+      long num;
+      if (maxBufferingDuration.isLongerThan(Duration.ZERO)) {
+        minBufferedTs.readLater();
+        num = storedBatchSize.read();
+
+        long oldOutputTs =
+            MoreObjects.firstNonNull(
+                minBufferedTs.read(), 
BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis());
+        minBufferedTs.add(elementTs.getMillis());
+        // If this is the first element in the batch or if the timer's output 
timestamp needs
+        // modifying, then set a
+        // timer.
+        if (num == 1 || minBufferedTs.read() != oldOutputTs) {
+          long targetTs =
+              MoreObjects.firstNonNull(
+                  timerTs.read(),
+                  bufferingTimer.getCurrentRelativeTime().getMillis()
+                      + maxBufferingDuration.getMillis());
+          bufferingTimer
+              .withOutputTimestamp(Instant.ofEpochMilli(minBufferedTs.read()))
+              .set(Instant.ofEpochMilli(targetTs));
+        }
       }
+      num = storedBatchSize.read();
+
       if (num % prefetchFrequency == 0) {
         // Prefetch data and modify batch state (readLater() modifies this)
         batch.readLater();
@@ -411,14 +465,15 @@ public class GroupIntoBatches<K, InputT>
       if (num >= batchSize
           || (batchSizeBytes != Long.MAX_VALUE && storedBatchSizeBytes.read() 
>= batchSizeBytes)) {
         LOG.debug("*** END OF BATCH *** for window {}", window.toString());
-        flushBatch(receiver, element.getKey(), batch, storedBatchSize, 
storedBatchSizeBytes);
-        //  Reset the buffering timer (if not null) since the state is empty 
now and we want to
-        // release the watermark. It'll be extended again if a
-        // new element arrives prior to the expiration time set here.
-        // TODO(BEAM-10887): Use clear() when it's available.
-        if (maxBufferingDuration.isLongerThan(Duration.ZERO)) {
-          bufferingTimer.offset(maxBufferingDuration).setRelative();
-        }
+        flushBatch(
+            receiver,
+            element.getKey(),
+            batch,
+            storedBatchSize,
+            storedBatchSizeBytes,
+            timerTs,
+            minBufferedTs);
+        bufferingTimer.clear();
       }
     }
 
@@ -430,12 +485,15 @@ public class GroupIntoBatches<K, InputT>
         @StateId(BATCH_ID) BagState<InputT> batch,
         @StateId(NUM_ELEMENTS_IN_BATCH_ID) CombiningState<Long, long[], Long> 
storedBatchSize,
         @StateId(NUM_BYTES_IN_BATCH_ID) CombiningState<Long, long[], Long> 
storedBatchSizeBytes,
+        @StateId(TIMER_TIMESTAMP) ValueState<Long> timerTs,
+        @StateId(MIN_BUFFERED_TS) CombiningState<Long, long[], Long> 
minBufferedTs,
         @TimerId(END_OF_BUFFERING_ID) Timer bufferingTimer) {
       LOG.debug(
           "*** END OF BUFFERING *** for timer timestamp {} with buffering 
duration {}",
           timestamp,
           maxBufferingDuration);
-      flushBatch(receiver, key, batch, storedBatchSize, storedBatchSizeBytes);
+      flushBatch(
+          receiver, key, batch, storedBatchSize, storedBatchSizeBytes, 
timerTs, minBufferedTs);
     }
 
     @OnWindowExpiration
@@ -444,8 +502,11 @@ public class GroupIntoBatches<K, InputT>
         @Key K key,
         @StateId(BATCH_ID) BagState<InputT> batch,
         @StateId(NUM_ELEMENTS_IN_BATCH_ID) CombiningState<Long, long[], Long> 
storedBatchSize,
-        @StateId(NUM_BYTES_IN_BATCH_ID) CombiningState<Long, long[], Long> 
storedBatchSizeBytes) {
-      flushBatch(receiver, key, batch, storedBatchSize, storedBatchSizeBytes);
+        @StateId(NUM_BYTES_IN_BATCH_ID) CombiningState<Long, long[], Long> 
storedBatchSizeBytes,
+        @StateId(TIMER_TIMESTAMP) ValueState<Long> timerTs,
+        @StateId(MIN_BUFFERED_TS) CombiningState<Long, long[], Long> 
minBufferedTs) {
+      flushBatch(
+          receiver, key, batch, storedBatchSize, storedBatchSizeBytes, 
timerTs, minBufferedTs);
     }
 
     // We no longer set this timer, since OnWindowExpiration takes care of 
his. However we leave the
@@ -459,12 +520,15 @@ public class GroupIntoBatches<K, InputT>
         @StateId(BATCH_ID) BagState<InputT> batch,
         @StateId(NUM_ELEMENTS_IN_BATCH_ID) CombiningState<Long, long[], Long> 
storedBatchSize,
         @StateId(NUM_BYTES_IN_BATCH_ID) CombiningState<Long, long[], Long> 
storedBatchSizeBytes,
+        @StateId(TIMER_TIMESTAMP) ValueState<Long> timerTs,
+        @StateId(MIN_BUFFERED_TS) CombiningState<Long, long[], Long> 
minBufferedTs,
         BoundedWindow window) {
       LOG.debug(
           "*** END OF WINDOW *** for timer timestamp {} in windows {}",
           timestamp,
           window.toString());
-      flushBatch(receiver, key, batch, storedBatchSize, storedBatchSizeBytes);
+      flushBatch(
+          receiver, key, batch, storedBatchSize, storedBatchSizeBytes, 
timerTs, minBufferedTs);
     }
 
     private void flushBatch(
@@ -472,16 +536,29 @@ public class GroupIntoBatches<K, InputT>
         K key,
         BagState<InputT> batch,
         CombiningState<Long, long[], Long> storedBatchSize,
-        CombiningState<Long, long[], Long> storedBatchSizeBytes) {
+        CombiningState<Long, long[], Long> storedBatchSizeBytes,
+        ValueState<Long> timerTs,
+        CombiningState<Long, long[], Long> minBufferedTs) {
       Iterable<InputT> values = batch.read();
       // When the timer fires, batch state might be empty
       if (!Iterables.isEmpty(values)) {
         receiver.output(KV.of(key, values));
       }
+      clearState(batch, storedBatchSize, storedBatchSizeBytes, timerTs, 
minBufferedTs);
+      ;
+    }
+
+    private void clearState(
+        BagState<InputT> batch,
+        CombiningState<Long, long[], Long> storedBatchSize,
+        CombiningState<Long, long[], Long> storedBatchSizeBytes,
+        ValueState<Long> timerTs,
+        CombiningState<Long, long[], Long> minBufferedTs) {
       batch.clear();
-      LOG.debug("*** BATCH *** clear");
       storedBatchSize.clear();
       storedBatchSizeBytes.clear();
+      timerTs.clear();
+      minBufferedTs.clear();
     }
   }
 }

Reply via email to