This is an automated email from the ASF dual-hosted git repository.

kenn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 88acc52  [BEAM-8543] Dataflow streaming timers are not strictly time 
ordered when set earlier mid-bundle (#11924)
88acc52 is described below

commit 88acc5267f759d81e9836a9db17b9e0ee521c785
Author: Rehman Murad Ali <rehmanmurada...@gmail.com>
AuthorDate: Thu Jul 30 01:06:52 2020 +0500

    [BEAM-8543] Dataflow streaming timers are not strictly time ordered when 
set earlier mid-bundle (#11924)
---
 CHANGES.md                                         |  1 +
 .../worker/StreamingModeExecutionContext.java      | 75 +++++++++++++++++-----
 .../dataflow/worker/WindmillTimerInternals.java    | 25 ++++++++
 .../org/apache/beam/sdk/transforms/ParDoTest.java  | 69 ++++++++++++++++----
 4 files changed, 141 insertions(+), 29 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 1cfc52c..c2665e7 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -146,6 +146,7 @@
 * Upgrade Sphinx to 3.0.3 for building PyDoc.
 * Added a PTransform for image annotation using Google Cloud AI image 
processing service
 ([BEAM-9646](https://issues.apache.org/jira/browse/BEAM-9646))
+* Dataflow streaming timers are not strictly time ordered when set earlier 
mid-bundle ([BEAM-8543](https://issues.apache.org/jira/browse/BEAM-8543)).
 
 ## Breaking Changes
 
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
index 7ed390b..334f145 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
@@ -24,10 +24,12 @@ import com.google.api.services.dataflow.model.CounterUpdate;
 import com.google.api.services.dataflow.model.SideInputInfo;
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.PriorityQueue;
 import java.util.Set;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicLong;
@@ -519,7 +521,7 @@ public class StreamingModeExecutionContext extends 
DataflowExecutionContext<Step
               synchronizedProcessingTime);
 
       this.cachedFiredTimers = null;
-      this.cachedFiredUserTimers = null;
+      this.toBeFiredTimersOrdered = null;
     }
 
     public void flushState() {
@@ -559,28 +561,67 @@ public class StreamingModeExecutionContext extends 
DataflowExecutionContext<Step
       return nextTimer;
     }
 
-    // Lazily initialized
-    private Iterator<TimerData> cachedFiredUserTimers = null;
+    private PriorityQueue<TimerData> toBeFiredTimersOrdered = null;
+
+    // to track if timer is reset earlier mid-bundle.
+    // Map of timer's id to timer's firing time to check
+    // the actual firing time of a timer.
+    private Map<String, Instant> firedTimer = new HashMap<>();
 
     public <W extends BoundedWindow> TimerData getNextFiredUserTimer(Coder<W> 
windowCoder) {
-      if (cachedFiredUserTimers == null) {
-        cachedFiredUserTimers =
-            
FluentIterable.<Timer>from(StreamingModeExecutionContext.this.getFiredTimers())
-                .filter(
-                    timer ->
-                        WindmillTimerInternals.isUserTimer(timer)
-                            && timer.getStateFamily().equals(stateFamily))
-                .transform(
-                    timer ->
-                        WindmillTimerInternals.windmillTimerToTimerData(
-                            WindmillNamespacePrefix.USER_NAMESPACE_PREFIX, 
timer, windowCoder))
-                .iterator();
+      if (toBeFiredTimersOrdered == null) {
+
+        toBeFiredTimersOrdered = new 
PriorityQueue<>(Comparator.comparing(TimerData::getTimestamp));
+        
FluentIterable.from(StreamingModeExecutionContext.this.getFiredTimers())
+            .filter(
+                timer ->
+                    WindmillTimerInternals.isUserTimer(timer)
+                        && timer.getStateFamily().equals(stateFamily))
+            .transform(
+                timer ->
+                    WindmillTimerInternals.windmillTimerToTimerData(
+                        WindmillNamespacePrefix.USER_NAMESPACE_PREFIX, timer, 
windowCoder))
+            .iterator()
+            .forEachRemaining(
+                timerData -> {
+                  firedTimer.put(
+                      timerData.getTimerId() + '+' + 
timerData.getTimerFamilyId(),
+                      timerData.getTimestamp());
+                  toBeFiredTimersOrdered.add(timerData);
+                });
       }
 
-      if (!cachedFiredUserTimers.hasNext()) {
+      Instant currentInputWatermark = 
userTimerInternals.currentInputWatermarkTime();
+
+      if (userTimerInternals.hasTimerBefore(currentInputWatermark)) {
+        List<TimerData> currentTimers = userTimerInternals.getCurrentTimers();
+
+        for (TimerData timerData : currentTimers) {
+          firedTimer.put(
+              timerData.getTimerId() + '+' + timerData.getTimerFamilyId(),
+              timerData.getTimestamp());
+          toBeFiredTimersOrdered.add(timerData);
+        }
+      }
+
+      TimerData nextTimer = null;
+
+      // fire timer only if its timestamp matched. Else it is either reset or 
obsolete.
+      while (!toBeFiredTimersOrdered.isEmpty()) {
+        nextTimer = toBeFiredTimersOrdered.poll();
+        String timerUniqueId = nextTimer.getTimerId() + '+' + 
nextTimer.getTimerFamilyId();
+        if (firedTimer.containsKey(timerUniqueId)
+            && 
firedTimer.get(timerUniqueId).isEqual(nextTimer.getTimestamp())) {
+          break;
+        } else {
+          nextTimer = null;
+        }
+      }
+
+      if (nextTimer == null) {
         return null;
       }
-      TimerData nextTimer = cachedFiredUserTimers.next();
+
       // User timers must be explicitly deleted when delivered, to release the 
implied hold
       userTimerInternals.deleteTimer(nextTimer);
       return nextTimer;
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
index f46fd49..5269cf2 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
@@ -22,6 +22,8 @@ import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Prec
 
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
 import org.apache.beam.runners.core.StateNamespace;
 import org.apache.beam.runners.core.StateNamespaces;
 import org.apache.beam.runners.core.TimerInternals;
@@ -225,6 +227,29 @@ class WindmillTimerInternals implements TimerInternals {
     timers.clear();
   }
 
+  public boolean hasTimerBefore(Instant time) {
+    for (Cell<String, StateNamespace, Boolean> cell : 
timerStillPresent.cellSet()) {
+      TimerData timerData = timers.get(cell.getRowKey(), cell.getColumnKey());
+      if (cell.getValue()) {
+        if (timerData.getTimestamp().isBefore(time)) {
+          return true;
+        }
+      }
+    }
+    return false;
+  }
+
+  public List<TimerData> getCurrentTimers() {
+    List<TimerData> timerDataList = new ArrayList<>();
+    for (Cell<String, StateNamespace, Boolean> cell : 
timerStillPresent.cellSet()) {
+      TimerData timerData = timers.get(cell.getRowKey(), cell.getColumnKey());
+      if (cell.getValue()) {
+        timerDataList.add(timerData);
+      }
+    }
+    return timerDataList;
+  }
+
   private boolean needsWatermarkHold(TimerData timerData) {
     // If it is a user timer or a system timer with outputTimestamp different 
than timestamp
     return WindmillNamespacePrefix.USER_NAMESPACE_PREFIX.equals(prefix)
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index 3f0d13c..776fd3e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -3959,7 +3959,7 @@ public class ParDoTest implements Serializable {
       }
 
       testEventTimeTimerOrderingWithInputPTransform(
-          now, numTestElements, builder.advanceWatermarkToInfinity());
+          now, numTestElements, builder.advanceWatermarkToInfinity(), 
IsBounded.BOUNDED);
     }
 
     /** A test makes sure that an event time timers are correctly ordered 
using Create transform. */
@@ -3970,7 +3970,7 @@ public class ParDoTest implements Serializable {
       UsesStatefulParDo.class,
       UsesStrictTimerOrdering.class
     })
-    public void testEventTimeTimerOrderingWithCreate() throws Exception {
+    public void testEventTimeTimerOrderingWithCreateBounded() throws Exception 
{
       final int numTestElements = 100;
       final Instant now = new Instant(1500000000000L);
 
@@ -3980,13 +3980,39 @@ public class ParDoTest implements Serializable {
       }
 
       testEventTimeTimerOrderingWithInputPTransform(
-          now, numTestElements, Create.timestamped(elements));
+          now, numTestElements, Create.timestamped(elements), 
IsBounded.BOUNDED);
+    }
+
+    /**
+     * A test makes sure that an event time timers are correctly ordered using 
Create transform
+     * unbounded.
+     */
+    @Test
+    @Category({
+      ValidatesRunner.class,
+      UsesTimersInParDo.class,
+      UsesStatefulParDo.class,
+      UsesUnboundedPCollections.class,
+      UsesStrictTimerOrdering.class
+    })
+    public void testEventTimeTimerOrderingWithCreateUnbounded() throws 
Exception {
+      final int numTestElements = 100;
+      final Instant now = new Instant(1500000000000L);
+
+      List<TimestampedValue<KV<String, String>>> elements = new ArrayList<>();
+      for (int i = 0; i < numTestElements; i++) {
+        elements.add(TimestampedValue.of(KV.of("dummy", "" + i), now.plus(i * 
1000)));
+      }
+
+      testEventTimeTimerOrderingWithInputPTransform(
+          now, numTestElements, Create.timestamped(elements), 
IsBounded.UNBOUNDED);
     }
 
     private void testEventTimeTimerOrderingWithInputPTransform(
         Instant now,
         int numTestElements,
-        PTransform<PBegin, PCollection<KV<String, String>>> transform)
+        PTransform<PBegin, PCollection<KV<String, String>>> transform,
+        IsBounded isBounded)
         throws Exception {
 
       final String timerIdBagAppend = "append";
@@ -4070,7 +4096,8 @@ public class ParDoTest implements Serializable {
             }
           };
 
-      PCollection<String> output = 
pipeline.apply(transform).apply(ParDo.of(fn));
+      PCollection<String> output =
+          
pipeline.apply(transform).setIsBoundedInternal(isBounded).apply(ParDo.of(fn));
       List<String> expected =
           IntStream.rangeClosed(0, numTestElements)
               .mapToObj(expandFn(numTestElements))
@@ -4154,16 +4181,25 @@ public class ParDoTest implements Serializable {
           TestStream.create(KvCoder.of(VoidCoder.of(), VoidCoder.of()))
               .addElements(KV.of(null, null))
               .advanceWatermarkToInfinity();
-      pipeline.apply(TwoTimerTest.of(now, end, input));
+      pipeline.apply(TwoTimerTest.of(now, end, input, IsBounded.BOUNDED));
+      pipeline.run();
+    }
+
+    @Test
+    @Category({ValidatesRunner.class, UsesTimersInParDo.class, 
UsesStrictTimerOrdering.class})
+    public void testTwoTimersSettingEachOtherWithCreateAsInputBounded() {
+      Instant now = new Instant(1500000000000L);
+      Instant end = now.plus(100);
+      pipeline.apply(TwoTimerTest.of(now, end, Create.of(KV.of(null, null)), 
IsBounded.BOUNDED));
       pipeline.run();
     }
 
     @Test
     @Category({ValidatesRunner.class, UsesTimersInParDo.class, 
UsesStrictTimerOrdering.class})
-    public void testTwoTimersSettingEachOtherWithCreateAsInput() {
+    public void testTwoTimersSettingEachOtherWithCreateAsInputUnbounded() {
       Instant now = new Instant(1500000000000L);
       Instant end = now.plus(100);
-      pipeline.apply(TwoTimerTest.of(now, end, Create.of(KV.of(null, null))));
+      pipeline.apply(TwoTimerTest.of(now, end, Create.of(KV.of(null, null)), 
IsBounded.UNBOUNDED));
       pipeline.run();
     }
 
@@ -4337,18 +4373,26 @@ public class ParDoTest implements Serializable {
     private static class TwoTimerTest extends PTransform<PBegin, PDone> {
 
       private static PTransform<PBegin, PDone> of(
-          Instant start, Instant end, PTransform<PBegin, PCollection<KV<Void, 
Void>>> input) {
-        return new TwoTimerTest(start, end, input);
+          Instant start,
+          Instant end,
+          PTransform<PBegin, PCollection<KV<Void, Void>>> input,
+          IsBounded isBounded) {
+        return new TwoTimerTest(start, end, input, isBounded);
       }
 
       private final Instant start;
       private final Instant end;
+      private final IsBounded isBounded;
       private final transient PTransform<PBegin, PCollection<KV<Void, Void>>> 
inputPTransform;
 
       public TwoTimerTest(
-          Instant start, Instant end, PTransform<PBegin, PCollection<KV<Void, 
Void>>> input) {
+          Instant start,
+          Instant end,
+          PTransform<PBegin, PCollection<KV<Void, Void>>> input,
+          IsBounded isBounded) {
         this.start = start;
         this.end = end;
+        this.isBounded = isBounded;
         this.inputPTransform = input;
       }
 
@@ -4361,6 +4405,7 @@ public class ParDoTest implements Serializable {
         PCollection<String> result =
             input
                 .apply(inputPTransform)
+                .setIsBoundedInternal(isBounded)
                 .apply(
                     ParDo.of(
                         new DoFn<KV<Void, Void>, String>() {
@@ -4425,7 +4470,7 @@ public class ParDoTest implements Serializable {
                         }));
 
         List<String> expected =
-            LongStream.rangeClosed(0, 100)
+            LongStream.rangeClosed(0, end.minus(start.getMillis()).getMillis())
                 .mapToObj(e -> (Long) e)
                 .flatMap(e -> Arrays.asList("t1:" + e + ":" + e, "t2:" + e + 
":" + e).stream())
                 .collect(Collectors.toList());

Reply via email to