Allow InProcess TransformEvaluators to refuse inputs

Inputs that cannot be processed (generally due to a side input not being
ready) can be added to a list of unprocessed elements, which will
schedule them to be executed at a later point.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0518fc68
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0518fc68
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0518fc68

Branch: refs/heads/master
Commit: 0518fc687febca6dca75159924265fdc6196a572
Parents: 59cca8d
Author: Thomas Groh <tg...@google.com>
Authored: Tue May 3 13:24:25 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Fri May 6 11:17:37 2016 -0700

----------------------------------------------------------------------
 .../beam/runners/direct/CommittedResult.java    |  16 ++-
 .../direct/ExecutorServiceParallelExecutor.java |   6 +
 .../direct/InMemoryWatermarkManager.java        |   7 +-
 .../direct/InProcessEvaluationContext.java      |   6 +-
 .../direct/InProcessTransformResult.java        |   7 +
 .../runners/direct/StepTransformResult.java     |  17 +++
 .../runners/direct/CommittedResultTest.java     |  34 ++++-
 .../direct/InMemoryWatermarkManagerTest.java    | 127 ++++++++++++++++---
 .../runners/direct/TransformExecutorTest.java   |  11 +-
 9 files changed, 209 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0518fc68/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedResult.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedResult.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedResult.java
index d15e012..4a42e34 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedResult.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedResult.java
@@ -23,6 +23,8 @@ import org.apache.beam.sdk.transforms.AppliedPTransform;
 
 import com.google.auto.value.AutoValue;
 
+import javax.annotation.Nullable;
+
 /**
  * A {@link InProcessTransformResult} that has been committed.
  */
@@ -34,13 +36,25 @@ abstract class CommittedResult {
   public abstract AppliedPTransform<?, ?, ?> getTransform();
 
   /**
+   * Returns the {@link CommittedBundle} that contains the input elements that 
could not be
+   * processed by the evaluation.
+   *
+   * <p>{@code null} if the input bundle was null.
+   */
+  @Nullable
+  public abstract CommittedBundle<?> getUnprocessedInputs();
+
+  /**
    * Returns the outputs produced by the transform.
    */
   public abstract Iterable<? extends CommittedBundle<?>> getOutputs();
 
   public static CommittedResult create(
-      InProcessTransformResult original, Iterable<? extends 
CommittedBundle<?>> outputs) {
+      InProcessTransformResult original,
+      CommittedBundle<?> unprocessedElements,
+      Iterable<? extends CommittedBundle<?>> outputs) {
     return new AutoValue_CommittedResult(original.getTransform(),
+        unprocessedElements,
         outputs);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0518fc68/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
index fd4cc2c..570ddc4 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
@@ -37,6 +37,7 @@ import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
 
 import org.joda.time.Instant;
 import org.slf4j.Logger;
@@ -230,6 +231,11 @@ final class ExecutorServiceParallelExecutor implements 
InProcessExecutor {
         allUpdates.offer(ExecutorUpdate.fromBundle(outputBundle,
             valueToConsumers.get(outputBundle.getPCollection())));
       }
+      CommittedBundle<?> unprocessedInputs = 
committedResult.getUnprocessedInputs();
+      if (unprocessedInputs != null && 
!Iterables.isEmpty(unprocessedInputs.getElements())) {
+        allUpdates.offer(ExecutorUpdate.fromBundle(unprocessedInputs,
+            Collections.<AppliedPTransform<?, ?, 
?>>singleton(committedResult.getTransform())));
+      }
       return committedResult;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0518fc68/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InMemoryWatermarkManager.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InMemoryWatermarkManager.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InMemoryWatermarkManager.java
index 4d5a3a1..87ea4d5 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InMemoryWatermarkManager.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InMemoryWatermarkManager.java
@@ -849,8 +849,6 @@ public class InMemoryWatermarkManager {
       CommittedBundle<?> input,
       TimerUpdate timerUpdate,
       CommittedResult result) {
-    TransformWatermarks completedTransform = 
transformToWatermarks.get(result.getTransform());
-
     // Newly pending elements must be added before completed elements are 
removed, as the two
     // do not share a Mutex within this call and thus can be interleaved with 
external calls to
     // refresh.
@@ -861,6 +859,11 @@ public class InMemoryWatermarkManager {
       }
     }
 
+    TransformWatermarks completedTransform = 
transformToWatermarks.get(result.getTransform());
+    if (input != null) {
+      // Add the unprocessed inputs
+      completedTransform.addPending(result.getUnprocessedInputs());
+    }
     completedTransform.updateTimers(timerUpdate);
     if (input != null) {
       completedTransform.removePending(input);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0518fc68/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java
index d4f891e..5c19287 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java
@@ -152,7 +152,11 @@ class InProcessEvaluationContext {
     Iterable<? extends CommittedBundle<?>> committedBundles =
         commitBundles(result.getOutputBundles());
     // Update watermarks and timers
-    CommittedResult committedResult = CommittedResult.create(result, 
committedBundles);
+    CommittedResult committedResult = CommittedResult.create(result,
+        completedBundle == null ?
+            null :
+            completedBundle.withElements((Iterable) 
result.getUnprocessedElements()),
+        committedBundles);
     watermarkManager.updateWatermarks(
         completedBundle,
         result.getTimerUpdate().withCompletedTimers(completedTimers),

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0518fc68/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessTransformResult.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessTransformResult.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessTransformResult.java
index a132c33..0bc3ea1 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessTransformResult.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessTransformResult.java
@@ -22,6 +22,7 @@ import 
org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.common.CounterSet;
 import org.apache.beam.sdk.util.state.CopyOnAccessInMemoryStateInternals;
 
@@ -45,6 +46,12 @@ public interface InProcessTransformResult {
   Iterable<? extends UncommittedBundle<?>> getOutputBundles();
 
   /**
+   * Returns elements that were provided to the {@link TransformEvaluator} as 
input but were not
+   * processed.
+   */
+  Iterable<? extends WindowedValue<?>> getUnprocessedElements();
+
+  /**
    * Returns the {@link CounterSet} used by this {@link PTransform}, or null 
if this transform did
    * not use a {@link CounterSet}.
    */

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0518fc68/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java
index 46e7d04..b2e3897 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java
@@ -23,6 +23,7 @@ import 
org.apache.beam.runners.direct.InMemoryWatermarkManager.TimerUpdate;
 import 
org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.common.CounterSet;
 import org.apache.beam.sdk.util.state.CopyOnAccessInMemoryStateInternals;
 
@@ -41,6 +42,7 @@ import javax.annotation.Nullable;
 public class StepTransformResult implements InProcessTransformResult {
   private final AppliedPTransform<?, ?, ?> transform;
   private final Iterable<? extends UncommittedBundle<?>> bundles;
+  private final Iterable<? extends WindowedValue<?>> unprocessedElements;
   @Nullable private final CopyOnAccessInMemoryStateInternals<?> state;
   private final TimerUpdate timerUpdate;
   @Nullable private final CounterSet counters;
@@ -49,12 +51,14 @@ public class StepTransformResult implements 
InProcessTransformResult {
   private StepTransformResult(
       AppliedPTransform<?, ?, ?> transform,
       Iterable<? extends UncommittedBundle<?>> outputBundles,
+      Iterable<? extends WindowedValue<?>> unprocessedElements,
       CopyOnAccessInMemoryStateInternals<?> state,
       TimerUpdate timerUpdate,
       CounterSet counters,
       Instant watermarkHold) {
     this.transform = checkNotNull(transform);
     this.bundles = checkNotNull(outputBundles);
+    this.unprocessedElements = checkNotNull(unprocessedElements);
     this.state = state;
     this.timerUpdate = checkNotNull(timerUpdate);
     this.counters = counters;
@@ -67,6 +71,11 @@ public class StepTransformResult implements 
InProcessTransformResult {
   }
 
   @Override
+  public Iterable<? extends WindowedValue<?>> getUnprocessedElements() {
+    return unprocessedElements;
+  }
+
+  @Override
   public CounterSet getCounters() {
     return counters;
   }
@@ -113,6 +122,7 @@ public class StepTransformResult implements 
InProcessTransformResult {
   public static class Builder {
     private final AppliedPTransform<?, ?, ?> transform;
     private final ImmutableList.Builder<UncommittedBundle<?>> bundlesBuilder;
+    private final ImmutableList.Builder<WindowedValue<?>> 
unprocessedElementsBuilder;
     private CopyOnAccessInMemoryStateInternals<?> state;
     private TimerUpdate timerUpdate;
     private CounterSet counters;
@@ -122,6 +132,7 @@ public class StepTransformResult implements 
InProcessTransformResult {
       this.transform = transform;
       this.watermarkHold = watermarkHold;
       this.bundlesBuilder = ImmutableList.builder();
+      this.unprocessedElementsBuilder = ImmutableList.builder();
       this.timerUpdate = TimerUpdate.builder(null).build();
     }
 
@@ -129,6 +140,7 @@ public class StepTransformResult implements 
InProcessTransformResult {
       return new StepTransformResult(
           transform,
           bundlesBuilder.build(),
+          unprocessedElementsBuilder.build(),
           state,
           timerUpdate,
           counters,
@@ -150,6 +162,11 @@ public class StepTransformResult implements 
InProcessTransformResult {
       return this;
     }
 
+    public Builder addUnprocessedElements(Iterable<? extends WindowedValue<?>> 
unprocessed) {
+      unprocessedElementsBuilder.addAll(unprocessed);
+      return this;
+    }
+
     public Builder addOutput(
         UncommittedBundle<?> outputBundle, UncommittedBundle<?>... 
outputBundles) {
       bundlesBuilder.add(outputBundle);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0518fc68/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java
index b30e005..0d1b464 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java
@@ -18,11 +18,14 @@
 
 package org.apache.beam.runners.direct;
 
+import static org.hamcrest.Matchers.nullValue;
 import static org.junit.Assert.assertThat;
 
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
@@ -46,6 +49,7 @@ import java.util.List;
 @RunWith(JUnit4.class)
 public class CommittedResultTest implements Serializable {
   private transient TestPipeline p = TestPipeline.create();
+  private transient PCollection<Integer> created = p.apply(Create.of(1, 2));
   private transient AppliedPTransform<?, ?, ?> transform =
       AppliedPTransform.of("foo", p.begin(), PDone.in(p), new 
PTransform<PBegin, PDone>() {
       });
@@ -55,12 +59,38 @@ public class CommittedResultTest implements Serializable {
   public void getTransformExtractsFromResult() {
     CommittedResult result =
         
CommittedResult.create(StepTransformResult.withoutHold(transform).build(),
+            bundleFactory.createRootBundle(created).commit(Instant.now()),
             
Collections.<InProcessPipelineRunner.CommittedBundle<?>>emptyList());
 
     assertThat(result.getTransform(), Matchers.<AppliedPTransform<?, ?, 
?>>equalTo(transform));
   }
 
   @Test
+  public void getUncommittedElementsEqualInput() {
+    InProcessPipelineRunner.CommittedBundle<Integer> bundle =
+        bundleFactory.createRootBundle(created)
+            .add(WindowedValue.valueInGlobalWindow(2))
+            .commit(Instant.now());
+    CommittedResult result =
+        
CommittedResult.create(StepTransformResult.withoutHold(transform).build(),
+            bundle,
+            
Collections.<InProcessPipelineRunner.CommittedBundle<?>>emptyList());
+
+    assertThat(result.getUnprocessedInputs(),
+        Matchers.<InProcessPipelineRunner.CommittedBundle<?>>equalTo(bundle));
+  }
+
+  @Test
+  public void getUncommittedElementsNull() {
+    CommittedResult result =
+        
CommittedResult.create(StepTransformResult.withoutHold(transform).build(),
+            null,
+            
Collections.<InProcessPipelineRunner.CommittedBundle<?>>emptyList());
+
+    assertThat(result.getUnprocessedInputs(), nullValue());
+  }
+
+  @Test
   public void getOutputsEqualInput() {
     List<? extends InProcessPipelineRunner.CommittedBundle<?>> outputs =
         
ImmutableList.of(bundleFactory.createRootBundle(PCollection.createPrimitiveOutputInternal(p,
@@ -70,7 +100,9 @@ public class CommittedResultTest implements Serializable {
                 WindowingStrategy.globalDefault(),
                 PCollection.IsBounded.UNBOUNDED)).commit(Instant.now()));
     CommittedResult result =
-        
CommittedResult.create(StepTransformResult.withoutHold(transform).build(), 
outputs);
+        
CommittedResult.create(StepTransformResult.withoutHold(transform).build(),
+            bundleFactory.createRootBundle(created).commit(Instant.now()),
+            outputs);
 
     assertThat(result.getOutputs(), 
Matchers.containsInAnyOrder(outputs.toArray()));
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0518fc68/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InMemoryWatermarkManagerTest.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InMemoryWatermarkManagerTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InMemoryWatermarkManagerTest.java
index 15cdf8a..b45440d 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InMemoryWatermarkManagerTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InMemoryWatermarkManagerTest.java
@@ -71,6 +71,8 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
+import javax.annotation.Nullable;
+
 /**
  * Tests for {@link InMemoryWatermarkManager}.
  */
@@ -162,6 +164,7 @@ public class InMemoryWatermarkManagerTest implements 
Serializable {
     manager.updateWatermarks(null,
         TimerUpdate.empty(),
         result(createdInts.getProducingTransformInternal(),
+            null,
             Collections.<CommittedBundle<?>>singleton(output)),
         new Instant(8000L));
     TransformWatermarks updatedSourceWatermark =
@@ -181,6 +184,7 @@ public class InMemoryWatermarkManagerTest implements 
Serializable {
     manager.updateWatermarks(null,
         TimerUpdate.empty(),
         result(intsToFlatten.getProducingTransformInternal(),
+            null,
             
Collections.<CommittedBundle<?>>singleton(secondPcollectionBundle)),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
 
@@ -213,6 +217,7 @@ public class InMemoryWatermarkManagerTest implements 
Serializable {
     manager.updateWatermarks(secondPcollectionBundle,
         TimerUpdate.empty(),
         result(flattened.getProducingTransformInternal(),
+            
secondPcollectionBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
             
Collections.<CommittedBundle<?>>singleton(flattenedBundleSecondCreate)),
         null);
     TransformWatermarks transformAfterProcessing =
@@ -220,6 +225,7 @@ public class InMemoryWatermarkManagerTest implements 
Serializable {
     manager.updateWatermarks(secondPcollectionBundle,
         TimerUpdate.empty(),
         result(flattened.getProducingTransformInternal(),
+            
secondPcollectionBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
             
Collections.<CommittedBundle<?>>singleton(flattenedBundleSecondCreate)),
         null);
     assertThat(
@@ -237,6 +243,7 @@ public class InMemoryWatermarkManagerTest implements 
Serializable {
     manager.updateWatermarks(null,
         TimerUpdate.empty(),
         result(createdInts.getProducingTransformInternal(),
+            null,
             Collections.<CommittedBundle<?>>singleton(firstPcollectionBundle)),
         new Instant(Long.MAX_VALUE));
     TransformWatermarks firstSourceWatermarks =
@@ -267,6 +274,7 @@ public class InMemoryWatermarkManagerTest implements 
Serializable {
     manager.updateWatermarks(firstPcollectionBundle,
         TimerUpdate.empty(),
         result(flattened.getProducingTransformInternal(),
+            
firstPcollectionBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
             Collections.<CommittedBundle<?>>singleton(completedFlattenBundle)),
         null);
     TransformWatermarks afterConsumingAllInput =
@@ -291,6 +299,7 @@ public class InMemoryWatermarkManagerTest implements 
Serializable {
     manager.updateWatermarks(null,
         TimerUpdate.empty(),
         result(createdInts.getProducingTransformInternal(),
+            null,
             Collections.<CommittedBundle<?>>singleton(createdBundle)),
         new Instant(Long.MAX_VALUE));
     TransformWatermarks createdAfterProducing =
@@ -306,6 +315,7 @@ public class InMemoryWatermarkManagerTest implements 
Serializable {
     manager.updateWatermarks(createdBundle,
         TimerUpdate.empty(),
         result(keyed.getProducingTransformInternal(),
+            
createdBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
             Collections.<CommittedBundle<?>>singleton(keyBundle)),
             null);
     TransformWatermarks keyedWatermarks =
@@ -325,6 +335,7 @@ public class InMemoryWatermarkManagerTest implements 
Serializable {
     manager.updateWatermarks(createdBundle,
         TimerUpdate.empty(),
         result(filtered.getProducingTransformInternal(),
+            
createdBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
             Collections.<CommittedBundle<?>>singleton(filteredBundle)),
         null);
     TransformWatermarks filteredProcessedWatermarks =
@@ -350,6 +361,7 @@ public class InMemoryWatermarkManagerTest implements 
Serializable {
     manager.updateWatermarks(null,
         TimerUpdate.empty(),
         result(createdInts.getProducingTransformInternal(),
+            null,
             Collections.<CommittedBundle<?>>singleton(createdBundle)),
         new Instant(Long.MAX_VALUE));
 
@@ -360,6 +372,7 @@ public class InMemoryWatermarkManagerTest implements 
Serializable {
     manager.updateWatermarks(createdBundle,
         TimerUpdate.empty(),
         result(keyed.getProducingTransformInternal(),
+            
createdBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
             Collections.<CommittedBundle<?>>singleton(keyBundle)),
         new Instant(500L));
     TransformWatermarks keyedWatermarks =
@@ -389,17 +402,20 @@ public class InMemoryWatermarkManagerTest implements 
Serializable {
     manager.updateWatermarks(null,
         TimerUpdate.empty(),
         result(createdInts.getProducingTransformInternal(),
+            null,
             ImmutableList.of(firstKeyBundle, secondKeyBundle)),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
 
     manager.updateWatermarks(firstKeyBundle,
         TimerUpdate.empty(),
         result(filtered.getProducingTransformInternal(),
+            
firstKeyBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
             Collections.<CommittedBundle<?>>emptyList()),
         new Instant(-1000L));
     manager.updateWatermarks(secondKeyBundle,
         TimerUpdate.empty(),
         result(filtered.getProducingTransformInternal(),
+            
secondKeyBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
             Collections.<CommittedBundle<?>>emptyList()),
         new Instant(1234L));
 
@@ -414,6 +430,7 @@ public class InMemoryWatermarkManagerTest implements 
Serializable {
     manager.updateWatermarks(fauxFirstKeyTimerBundle,
         TimerUpdate.empty(),
         result(filtered.getProducingTransformInternal(),
+            
fauxFirstKeyTimerBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
             Collections.<CommittedBundle<?>>emptyList()),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
 
@@ -424,6 +441,7 @@ public class InMemoryWatermarkManagerTest implements 
Serializable {
     manager.updateWatermarks(fauxSecondKeyTimerBundle,
         TimerUpdate.empty(),
         result(filtered.getProducingTransformInternal(),
+            
fauxSecondKeyTimerBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
             Collections.<CommittedBundle<?>>emptyList()),
         new Instant(5678L));
     assertThat(filteredWatermarks.getOutputWatermark(), equalTo(new 
Instant(5678L)));
@@ -431,6 +449,7 @@ public class InMemoryWatermarkManagerTest implements 
Serializable {
     manager.updateWatermarks(fauxSecondKeyTimerBundle,
         TimerUpdate.empty(),
         result(filtered.getProducingTransformInternal(),
+            
fauxSecondKeyTimerBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
             Collections.<CommittedBundle<?>>emptyList()),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
     assertThat(filteredWatermarks.getOutputWatermark(),
@@ -447,6 +466,7 @@ public class InMemoryWatermarkManagerTest implements 
Serializable {
         
bundleFactory.createRootBundle(createdInts).commit(BoundedWindow.TIMESTAMP_MAX_VALUE);
     manager.updateWatermarks(null,  TimerUpdate.empty(),
         result(createdInts.getProducingTransformInternal(),
+            null,
             Collections.<CommittedBundle<?>>singleton(firstInput)),
         new Instant(0L));
     TransformWatermarks firstWatermarks =
@@ -458,6 +478,7 @@ public class InMemoryWatermarkManagerTest implements 
Serializable {
     manager.updateWatermarks(null,
         TimerUpdate.empty(),
         result(createdInts.getProducingTransformInternal(),
+            null,
             Collections.<CommittedBundle<?>>singleton(secondInput)),
         new Instant(-250L));
     TransformWatermarks secondWatermarks =
@@ -473,10 +494,12 @@ public class InMemoryWatermarkManagerTest implements 
Serializable {
   public void updateWatermarkWithHoldsShouldBeMonotonic() {
     CommittedBundle<Integer> createdBundle = timestampedBundle(createdInts,
         TimestampedValue.of(1, new Instant(1_000_000L)),
-            TimestampedValue.of(2, new Instant(1234L)),
-            TimestampedValue.of(3, new Instant(-1000L))); 
manager.updateWatermarks(null,
+        TimestampedValue.of(2, new Instant(1234L)),
+        TimestampedValue.of(3, new Instant(-1000L)));
+    manager.updateWatermarks(null,
         TimerUpdate.empty(),
         result(createdInts.getProducingTransformInternal(),
+            null,
             Collections.<CommittedBundle<?>>singleton(createdBundle)),
         new Instant(Long.MAX_VALUE));
 
@@ -487,6 +510,7 @@ public class InMemoryWatermarkManagerTest implements 
Serializable {
     manager.updateWatermarks(createdBundle,
         TimerUpdate.empty(),
         result(keyed.getProducingTransformInternal(),
+            
createdBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
             Collections.<CommittedBundle<?>>singleton(keyBundle)),
         new Instant(500L));
     TransformWatermarks keyedWatermarks =
@@ -505,6 +529,40 @@ public class InMemoryWatermarkManagerTest implements 
Serializable {
     assertThat(updatedWatermarks.getOutputWatermark(), 
equalTo(oldOutputWatermark));
   }
 
+  @Test
+  public void updateWatermarkWithUnprocessedElements() {
+    WindowedValue<Integer> first = WindowedValue.valueInGlobalWindow(1);
+    WindowedValue<Integer> second =
+        WindowedValue.timestampedValueInGlobalWindow(2, new Instant(-1000L));
+    WindowedValue<Integer> third =
+        WindowedValue.timestampedValueInGlobalWindow(3, new Instant(1234L));
+    CommittedBundle<Integer> createdBundle = 
bundleFactory.createRootBundle(createdInts)
+        .add(first)
+        .add(second)
+        .add(third)
+        .commit(clock.now());
+    manager.updateWatermarks(null,
+        TimerUpdate.empty(),
+        result(createdInts.getProducingTransformInternal(),
+            null,
+            Collections.<CommittedBundle<?>>singleton(createdBundle)),
+        BoundedWindow.TIMESTAMP_MAX_VALUE);
+
+    CommittedBundle<KV<String, Integer>> keyBundle = timestampedBundle(keyed,
+        TimestampedValue.of(KV.of("MyKey", 1), 
BoundedWindow.TIMESTAMP_MIN_VALUE));
+    manager.updateWatermarks(createdBundle,
+        TimerUpdate.empty(),
+        result(keyed.getProducingTransformInternal(),
+            createdBundle.withElements(ImmutableList.of(second, third)),
+            Collections.<CommittedBundle<?>>singleton(keyBundle)),
+        BoundedWindow.TIMESTAMP_MAX_VALUE);
+    TransformWatermarks keyedWatermarks =
+        manager.getWatermarks(keyed.getProducingTransformInternal());
+    // the unprocessed second and third are readded to pending
+    assertThat(
+        keyedWatermarks.getInputWatermark(), not(laterThan(new 
Instant(-1000L))));
+  }
+
   /**
    * Demonstrates that updateWatermarks in the presence of late data is 
monotonic.
    */
@@ -517,6 +575,7 @@ public class InMemoryWatermarkManagerTest implements 
Serializable {
     manager.updateWatermarks(null,
         TimerUpdate.empty(),
         result(createdInts.getProducingTransformInternal(),
+            null,
             Collections.<CommittedBundle<?>>singleton(createdBundle)),
         sourceWatermark);
 
@@ -528,6 +587,7 @@ public class InMemoryWatermarkManagerTest implements 
Serializable {
     manager.updateWatermarks(createdBundle,
         TimerUpdate.empty(),
         result(keyed.getProducingTransformInternal(),
+            
createdBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
             Collections.<CommittedBundle<?>>singleton(keyBundle)),
         null);
     TransformWatermarks onTimeWatermarks =
@@ -542,6 +602,7 @@ public class InMemoryWatermarkManagerTest implements 
Serializable {
     manager.updateWatermarks(null,
         TimerUpdate.empty(),
         result(createdInts.getProducingTransformInternal(),
+            
createdBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
             Collections.<CommittedBundle<?>>singleton(lateDataBundle)),
         new Instant(2_000_000L));
     TransformWatermarks bufferedLateWm =
@@ -560,6 +621,7 @@ public class InMemoryWatermarkManagerTest implements 
Serializable {
     manager.updateWatermarks(lateDataBundle,
         TimerUpdate.empty(),
         result(keyed.getProducingTransformInternal(),
+            
lateDataBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
             Collections.<CommittedBundle<?>>singleton(lateKeyedBundle)),
         null);
   }
@@ -567,8 +629,7 @@ public class InMemoryWatermarkManagerTest implements 
Serializable {
   public void updateWatermarkWithDifferentWindowedValueInstances() {
     manager.updateWatermarks(
         null,
-        TimerUpdate.empty(),
-        result(createdInts.getProducingTransformInternal(),
+        TimerUpdate.empty(), 
result(createdInts.getProducingTransformInternal(), null,
         Collections.<CommittedBundle<?>>singleton(
             bundleFactory
                 .createRootBundle(createdInts)
@@ -576,12 +637,14 @@ public class InMemoryWatermarkManagerTest implements 
Serializable {
                 .commit(Instant.now()))),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
 
-    manager.updateWatermarks(
-        bundleFactory.createRootBundle(createdInts)
-            .add(WindowedValue.valueInGlobalWindow(1))
-            .commit(Instant.now()),
+    CommittedBundle<Integer> createdBundle = 
bundleFactory.createRootBundle(createdInts)
+        .add(WindowedValue.valueInGlobalWindow(1))
+        .commit(Instant.now());
+    manager.updateWatermarks(createdBundle,
         TimerUpdate.empty(),
-        result(keyed.getProducingTransformInternal(), 
Collections.<CommittedBundle<?>>emptyList()),
+        result(keyed.getProducingTransformInternal(),
+            
createdBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
+            Collections.<CommittedBundle<?>>emptyList()),
         null);
     TransformWatermarks onTimeWatermarks =
         manager.getWatermarks(keyed.getProducingTransformInternal());
@@ -598,6 +661,7 @@ public class InMemoryWatermarkManagerTest implements 
Serializable {
     manager.updateWatermarks(null,
         TimerUpdate.empty(),
         result(createdInts.getProducingTransformInternal(),
+            null,
             Collections.<CommittedBundle<?>>singleton(emptyCreateOutput)),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
     TransformWatermarks updatedSourceWatermarks =
@@ -627,6 +691,7 @@ public class InMemoryWatermarkManagerTest implements 
Serializable {
     manager.updateWatermarks(null,
         TimerUpdate.empty(),
         result(createdInts.getProducingTransformInternal(),
+            null,
             Collections.<CommittedBundle<?>>singleton(firstCreateOutput)),
         new Instant(12_000L));
 
@@ -634,6 +699,7 @@ public class InMemoryWatermarkManagerTest implements 
Serializable {
     manager.updateWatermarks(firstCreateOutput,
         TimerUpdate.empty(),
         result(filtered.getProducingTransformInternal(),
+            
firstCreateOutput.withElements(Collections.<WindowedValue<Integer>>emptyList()),
             Collections.<CommittedBundle<?>>singleton(firstFilterOutput)),
         new Instant(10_000L));
     TransformWatermarks firstFilterWatermarks =
@@ -645,6 +711,7 @@ public class InMemoryWatermarkManagerTest implements 
Serializable {
     manager.updateWatermarks(null,
         TimerUpdate.empty(),
         result(createdInts.getProducingTransformInternal(),
+            null,
             Collections.<CommittedBundle<?>>singleton(emptyCreateOutput)),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
     TransformWatermarks updatedSourceWatermarks =
@@ -687,6 +754,7 @@ public class InMemoryWatermarkManagerTest implements 
Serializable {
     manager.updateWatermarks(null,
         TimerUpdate.empty(),
         result(createdInts.getProducingTransformInternal(),
+            null,
             Collections.<CommittedBundle<?>>singleton(createOutput)),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
     TransformWatermarks createAfterUpdate =
@@ -716,6 +784,7 @@ public class InMemoryWatermarkManagerTest implements 
Serializable {
     manager.updateWatermarks(createOutput,
         TimerUpdate.empty(),
         result(filtered.getProducingTransformInternal(),
+            
createOutput.withElements(Collections.<WindowedValue<Integer>>emptyList()),
             Collections.<CommittedBundle<?>>singleton(filterOutputBundle)),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
     TransformWatermarks filterAfterConsumed =
@@ -737,8 +806,10 @@ public class InMemoryWatermarkManagerTest implements 
Serializable {
   //  @Test
   public void getSynchronizedProcessingTimeOutputHeldToPendingTimers() {
     CommittedBundle<Integer> createdBundle = multiWindowedBundle(createdInts, 
1, 2, 4, 8);
-    manager.updateWatermarks(null,  TimerUpdate.empty(),
+    manager.updateWatermarks(null,
+        TimerUpdate.empty(),
         result(createdInts.getProducingTransformInternal(),
+            null,
             Collections.<CommittedBundle<?>>singleton(createdBundle)),
         new Instant(1248L));
 
@@ -759,6 +830,7 @@ public class InMemoryWatermarkManagerTest implements 
Serializable {
     manager.updateWatermarks(createdBundle,
         timers,
         result(filtered.getProducingTransformInternal(),
+            
createdBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
             Collections.<CommittedBundle<?>>singleton(filteredBundle)),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
     Instant startTime = clock.now();
@@ -796,6 +868,7 @@ public class InMemoryWatermarkManagerTest implements 
Serializable {
         TimerUpdate.builder("key")
             
.withCompletedTimers(Collections.<TimerData>singleton(pastTimer)).build(),
         result(filtered.getProducingTransformInternal(),
+            
filteredTimerBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
             Collections.<CommittedBundle<?>>singleton(filteredTimerResult)),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
 
@@ -809,6 +882,7 @@ public class InMemoryWatermarkManagerTest implements 
Serializable {
     manager.updateWatermarks(filteredTimerResult,
         TimerUpdate.empty(),
         result(filteredTimesTwo.getProducingTransformInternal(),
+            
filteredTimerResult.withElements(Collections.<WindowedValue<Integer>>emptyList()),
             Collections.<CommittedBundle<?>>emptyList()),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
     assertThat(filteredDoubledWms.getSynchronizedProcessingOutputTime(), 
equalTo(clock.now()));
@@ -846,6 +920,7 @@ public class InMemoryWatermarkManagerTest implements 
Serializable {
     manager.updateWatermarks(null,
         TimerUpdate.empty(),
         result(createdInts.getProducingTransformInternal(),
+            null,
             Collections.<CommittedBundle<?>>singleton(createOutput)),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
     TransformWatermarks createAfterUpdate =
@@ -859,6 +934,7 @@ public class InMemoryWatermarkManagerTest implements 
Serializable {
     manager.updateWatermarks(null,
         TimerUpdate.empty(),
         result(createdInts.getProducingTransformInternal(),
+            null,
             Collections.<CommittedBundle<?>>singleton(createSecondOutput)),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
 
@@ -871,6 +947,7 @@ public class InMemoryWatermarkManagerTest implements 
Serializable {
     manager.updateWatermarks(null,
         TimerUpdate.empty(),
         result(createdInts.getProducingTransformInternal(),
+            null,
             Collections.<CommittedBundle<?>>singleton(created)),
         new Instant(40_900L));
 
@@ -881,6 +958,7 @@ public class InMemoryWatermarkManagerTest implements 
Serializable {
     manager.updateWatermarks(created,
         TimerUpdate.builder("key").setTimer(upstreamProcessingTimer).build(),
         result(filtered.getProducingTransformInternal(),
+            
created.withElements(Collections.<WindowedValue<Integer>>emptyList()),
             Collections.<CommittedBundle<?>>singleton(filteredBundle)),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
 
@@ -901,6 +979,7 @@ public class InMemoryWatermarkManagerTest implements 
Serializable {
         TimerUpdate.builder("key")
             
.withCompletedTimers(Collections.singleton(upstreamProcessingTimer)).build(),
         result(filtered.getProducingTransformInternal(),
+            
otherCreated.withElements(Collections.<WindowedValue<Integer>>emptyList()),
             Collections.<CommittedBundle<?>>emptyList()),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
 
@@ -914,6 +993,7 @@ public class InMemoryWatermarkManagerTest implements 
Serializable {
         null,
         TimerUpdate.empty(),
         result(createdInts.getProducingTransformInternal(),
+            null,
             Collections.<CommittedBundle<?>>singleton(created)),
         new Instant(29_919_235L));
 
@@ -924,6 +1004,7 @@ public class InMemoryWatermarkManagerTest implements 
Serializable {
         created,
         TimerUpdate.empty(),
         result(filtered.getProducingTransformInternal(),
+            
created.withElements(Collections.<WindowedValue<Integer>>emptyList()),
             Collections.<CommittedBundle<?>>singleton(filteredBundle)),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
 
@@ -946,7 +1027,9 @@ public class InMemoryWatermarkManagerTest implements 
Serializable {
     CommittedBundle<Integer> createdBundle = multiWindowedBundle(filtered);
     manager.updateWatermarks(null,
         TimerUpdate.empty(),
-        result(createdInts.getProducingTransformInternal(), 
Collections.singleton(createdBundle)),
+        result(createdInts.getProducingTransformInternal(),
+            null,
+            Collections.singleton(createdBundle)),
         new Instant(1500L));
 
     TimerData earliestTimer =
@@ -966,6 +1049,7 @@ public class InMemoryWatermarkManagerTest implements 
Serializable {
     manager.updateWatermarks(createdBundle,
         update,
         result(filtered.getProducingTransformInternal(),
+            
createdBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
             
Collections.<CommittedBundle<?>>singleton(multiWindowedBundle(intsToFlatten))),
         new Instant(1000L));
 
@@ -982,6 +1066,7 @@ public class InMemoryWatermarkManagerTest implements 
Serializable {
     manager.updateWatermarks(null,
         TimerUpdate.empty(),
         result(createdInts.getProducingTransformInternal(),
+            null,
             Collections.<CommittedBundle<?>>emptyList()),
         new Instant(50_000L));
     Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> 
secondTransformFiredTimers =
@@ -1007,7 +1092,9 @@ public class InMemoryWatermarkManagerTest implements 
Serializable {
     CommittedBundle<Integer> createdBundle = multiWindowedBundle(filtered);
     manager.updateWatermarks(null,
         TimerUpdate.empty(),
-        result(createdInts.getProducingTransformInternal(), 
Collections.singleton(createdBundle)),
+        result(createdInts.getProducingTransformInternal(),
+            null,
+            Collections.singleton(createdBundle)),
         new Instant(1500L));
 
     TimerData earliestTimer =
@@ -1028,7 +1115,8 @@ public class InMemoryWatermarkManagerTest implements 
Serializable {
         createdBundle,
         update,
         result(filtered.getProducingTransformInternal(),
-        
Collections.<CommittedBundle<?>>singleton(multiWindowedBundle(intsToFlatten))),
+            
createdBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
+            
Collections.<CommittedBundle<?>>singleton(multiWindowedBundle(intsToFlatten))),
         new Instant(1000L));
 
     Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> 
firstTransformFiredTimers =
@@ -1045,6 +1133,7 @@ public class InMemoryWatermarkManagerTest implements 
Serializable {
     manager.updateWatermarks(null,
         TimerUpdate.empty(),
         result(createdInts.getProducingTransformInternal(),
+            null,
             Collections.<CommittedBundle<?>>emptyList()),
         new Instant(50_000L));
     Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> 
secondTransformFiredTimers =
@@ -1070,7 +1159,9 @@ public class InMemoryWatermarkManagerTest implements 
Serializable {
     CommittedBundle<Integer> createdBundle = multiWindowedBundle(filtered);
     manager.updateWatermarks(null,
         TimerUpdate.empty(),
-        result(createdInts.getProducingTransformInternal(), 
Collections.singleton(createdBundle)),
+        result(createdInts.getProducingTransformInternal(),
+            null,
+            Collections.singleton(createdBundle)),
         new Instant(1500L));
 
     TimerData earliestTimer = TimerData.of(
@@ -1091,6 +1182,7 @@ public class InMemoryWatermarkManagerTest implements 
Serializable {
         createdBundle,
         update,
         result(filtered.getProducingTransformInternal(),
+            
createdBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
             
Collections.<CommittedBundle<?>>singleton(multiWindowedBundle(intsToFlatten))),
         new Instant(1000L));
 
@@ -1109,6 +1201,7 @@ public class InMemoryWatermarkManagerTest implements 
Serializable {
     manager.updateWatermarks(null,
         TimerUpdate.empty(),
         result(createdInts.getProducingTransformInternal(),
+            null,
             Collections.<CommittedBundle<?>>emptyList()),
         new Instant(50_000L));
     Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> 
secondTransformFiredTimers =
@@ -1273,8 +1366,10 @@ public class InMemoryWatermarkManagerTest implements 
Serializable {
 
   private final CommittedResult result(
       AppliedPTransform<?, ?, ?> transform,
+      @Nullable CommittedBundle<?> unprocessedBundle,
       Iterable<? extends CommittedBundle<?>> bundles) {
-    return CommittedResult.create(StepTransformResult.withoutHold(transform)
-        .build(), bundles);
+    return 
CommittedResult.create(StepTransformResult.withoutHold(transform).build(),
+        unprocessedBundle,
+        bundles);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0518fc68/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
index 959e9d3..8b6053e 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
@@ -491,7 +491,16 @@ public class TransformExecutorTest {
         CommittedBundle<?> inputBundle, InProcessTransformResult result) {
       handledResult = result;
       onMethod.countDown();
-      return CommittedResult.create(result, 
Collections.<CommittedBundle<?>>emptyList());
+      @SuppressWarnings("rawtypes") Iterable unprocessedElements =
+          result.getUnprocessedElements() == null ?
+              Collections.emptyList() :
+              result.getUnprocessedElements();
+
+      CommittedBundle<?> unprocessedBundle =
+          inputBundle == null ? null : 
inputBundle.withElements(unprocessedElements);
+      return CommittedResult.create(result,
+          unprocessedBundle,
+          Collections.<CommittedBundle<?>>emptyList());
     }
 
     @Override


Reply via email to