Repository: incubator-beam
Updated Branches:
  refs/heads/master 9fab4ba51 -> 7c2124ba4


Only remove Elements that were pending from Pending Elements

Update the comparator so elements will only be removed from the
collection of pending elements if they were formerly added. This ensures
that elements cannot be removed if a timer delivery has an identical
timestamp to them.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/da57ae81
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/da57ae81
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/da57ae81

Branch: refs/heads/master
Commit: da57ae814bdc9c8d0661c27906f4a6e3c9e7474f
Parents: ecbc641
Author: Thomas Groh <tg...@google.com>
Authored: Fri Oct 7 15:50:05 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Fri Oct 7 15:50:05 2016 -0700

----------------------------------------------------------------------
 .../beam/runners/direct/WatermarkManager.java   | 31 +++++------------
 .../runners/direct/WatermarkManagerTest.java    | 36 ++++++++++++++++++++
 2 files changed, 45 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/da57ae81/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
index b3d1fc5..4792c39 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
@@ -21,10 +21,8 @@ import static 
com.google.common.base.Preconditions.checkNotNull;
 
 import com.google.auto.value.AutoValue;
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
 import com.google.common.base.MoreObjects;
 import com.google.common.collect.ComparisonChain;
-import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
@@ -210,7 +208,13 @@ public class WatermarkManager {
 
     public AppliedPTransformInputWatermark(Collection<? extends Watermark> 
inputWatermarks) {
       this.inputWatermarks = inputWatermarks;
-      this.pendingElements = TreeMultiset.create(new 
WindowedValueByTimestampComparator());
+      // The ordering must order elements by timestamp, and must not compare 
two distinct elements
+      // as equal. This is built on the assumption that any element added as a 
pending element will
+      // be consumed without modifications.
+      Ordering<WindowedValue<?>> pendingElementComparator =
+          new 
WindowedValueByTimestampComparator().compound(Ordering.arbitrary());
+      this.pendingElements =
+          TreeMultiset.create(pendingElementComparator);
       this.objectTimers = new HashMap<>();
       currentWatermark = new 
AtomicReference<>(BoundedWindow.TIMESTAMP_MIN_VALUE);
     }
@@ -626,19 +630,6 @@ public class WatermarkManager {
   private static final Ordering<Instant> INSTANT_ORDERING = Ordering.natural();
 
   /**
-   * A function that takes a WindowedValue and returns the exploded 
representation of that
-   * {@link WindowedValue}.
-   */
-  private static final Function<WindowedValue<?>, ? extends Iterable<? extends 
WindowedValue<?>>>
-      EXPLODE_WINDOWS_FN =
-          new Function<WindowedValue<?>, Iterable<? extends 
WindowedValue<?>>>() {
-            @Override
-            public Iterable<? extends WindowedValue<?>> apply(WindowedValue<?> 
input) {
-              return input.explodeWindows();
-            }
-          };
-
-  /**
    * For each (Object, PriorityQueue) pair in the provided map, remove each 
Timer that is before the
    * latestTime argument and put in in the result with the same key, then 
remove all of the keys
    * which have no more pending timers.
@@ -1128,19 +1119,15 @@ public class WatermarkManager {
     }
 
     private void removePending(CommittedBundle<?> bundle) {
-      inputWatermark.removePendingElements(elementsFromBundle(bundle));
+      inputWatermark.removePendingElements(bundle.getElements());
       synchronizedProcessingInputWatermark.removePending(bundle);
     }
 
     private void addPending(CommittedBundle<?> bundle) {
-      inputWatermark.addPendingElements(elementsFromBundle(bundle));
+      inputWatermark.addPendingElements(bundle.getElements());
       synchronizedProcessingInputWatermark.addPending(bundle);
     }
 
-    private Iterable<? extends WindowedValue<?>> 
elementsFromBundle(CommittedBundle<?> bundle) {
-      return 
FluentIterable.from(bundle.getElements()).transformAndConcat(EXPLODE_WINDOWS_FN);
-    }
-
     private Map<StructuralKey<?>, FiredTimers> extractFiredTimers() {
       Map<StructuralKey<?>, List<TimerData>> eventTimeTimers =
           inputWatermark.extractFiredEventTimeTimers();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/da57ae81/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
index a722b49..8a27243 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
@@ -563,6 +563,7 @@ public class WatermarkManagerTest implements Serializable {
         .add(second)
         .add(third)
         .commit(clock.now());
+
     manager.updateWatermarks(null,
         TimerUpdate.empty(),
         result(createdInts.getProducingTransformInternal(),
@@ -585,6 +586,41 @@ public class WatermarkManagerTest implements Serializable {
         keyedWatermarks.getInputWatermark(), not(laterThan(new 
Instant(-1000L))));
   }
 
+  @Test
+  public void updateWatermarkWithCompletedElementsNotPending() {
+    WindowedValue<Integer> first = 
WindowedValue.timestampedValueInGlobalWindow(1, new Instant(22));
+    CommittedBundle<Integer> createdBundle = 
bundleFactory.createBundle(createdInts)
+        .add(first)
+        .commit(clock.now());
+
+    WindowedValue<Integer> second =
+        WindowedValue.timestampedValueInGlobalWindow(2, new Instant(22));
+    CommittedBundle<Integer> neverCreatedBundle = 
bundleFactory.createBundle(createdInts)
+        .add(second)
+        .commit(clock.now());
+
+    manager.updateWatermarks(null,
+        TimerUpdate.empty(),
+        result(createdInts.getProducingTransformInternal(),
+            null,
+            Collections.<CommittedBundle<?>>singleton(createdBundle)),
+        BoundedWindow.TIMESTAMP_MAX_VALUE);
+
+    manager.updateWatermarks(
+        neverCreatedBundle,
+        TimerUpdate.empty(),
+        result(
+            filtered.getProducingTransformInternal(),
+            
neverCreatedBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
+            Collections.<CommittedBundle<?>>emptyList()),
+        BoundedWindow.TIMESTAMP_MAX_VALUE);
+
+    manager.refreshAll();
+    TransformWatermarks filteredWms =
+        manager.getWatermarks(filtered.getProducingTransformInternal());
+    assertThat(filteredWms.getInputWatermark(), equalTo(new Instant(22L)));
+  }
+
   /**
    * Demonstrates that updateWatermarks in the presence of late data is 
monotonic.
    */

Reply via email to