Allow InProcess TransformEvaluators to refuse inputs Inputs that cannot be processed (generally due to a side input not being ready) can be added to a list of unprocessed elements, which will schedule them to be executed at a later point.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0518fc68 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0518fc68 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0518fc68 Branch: refs/heads/master Commit: 0518fc687febca6dca75159924265fdc6196a572 Parents: 59cca8d Author: Thomas Groh <tg...@google.com> Authored: Tue May 3 13:24:25 2016 -0700 Committer: Thomas Groh <tg...@google.com> Committed: Fri May 6 11:17:37 2016 -0700 ---------------------------------------------------------------------- .../beam/runners/direct/CommittedResult.java | 16 ++- .../direct/ExecutorServiceParallelExecutor.java | 6 + .../direct/InMemoryWatermarkManager.java | 7 +- .../direct/InProcessEvaluationContext.java | 6 +- .../direct/InProcessTransformResult.java | 7 + .../runners/direct/StepTransformResult.java | 17 +++ .../runners/direct/CommittedResultTest.java | 34 ++++- .../direct/InMemoryWatermarkManagerTest.java | 127 ++++++++++++++++--- .../runners/direct/TransformExecutorTest.java | 11 +- 9 files changed, 209 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0518fc68/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedResult.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedResult.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedResult.java index d15e012..4a42e34 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedResult.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedResult.java @@ -23,6 +23,8 @@ import org.apache.beam.sdk.transforms.AppliedPTransform; import com.google.auto.value.AutoValue; +import javax.annotation.Nullable; + /** * A {@link InProcessTransformResult} that has been committed. */ @@ -34,13 +36,25 @@ abstract class CommittedResult { public abstract AppliedPTransform<?, ?, ?> getTransform(); /** + * Returns the {@link CommittedBundle} that contains the input elements that could not be + * processed by the evaluation. + * + * <p>{@code null} if the input bundle was null. + */ + @Nullable + public abstract CommittedBundle<?> getUnprocessedInputs(); + + /** * Returns the outputs produced by the transform. */ public abstract Iterable<? extends CommittedBundle<?>> getOutputs(); public static CommittedResult create( - InProcessTransformResult original, Iterable<? extends CommittedBundle<?>> outputs) { + InProcessTransformResult original, + CommittedBundle<?> unprocessedElements, + Iterable<? extends CommittedBundle<?>> outputs) { return new AutoValue_CommittedResult(original.getTransform(), + unprocessedElements, outputs); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0518fc68/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java index fd4cc2c..570ddc4 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java @@ -37,6 +37,7 @@ import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; import org.joda.time.Instant; import org.slf4j.Logger; @@ -230,6 +231,11 @@ final class ExecutorServiceParallelExecutor implements InProcessExecutor { allUpdates.offer(ExecutorUpdate.fromBundle(outputBundle, valueToConsumers.get(outputBundle.getPCollection()))); } + CommittedBundle<?> unprocessedInputs = committedResult.getUnprocessedInputs(); + if (unprocessedInputs != null && !Iterables.isEmpty(unprocessedInputs.getElements())) { + allUpdates.offer(ExecutorUpdate.fromBundle(unprocessedInputs, + Collections.<AppliedPTransform<?, ?, ?>>singleton(committedResult.getTransform()))); + } return committedResult; } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0518fc68/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InMemoryWatermarkManager.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InMemoryWatermarkManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InMemoryWatermarkManager.java index 4d5a3a1..87ea4d5 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InMemoryWatermarkManager.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InMemoryWatermarkManager.java @@ -849,8 +849,6 @@ public class InMemoryWatermarkManager { CommittedBundle<?> input, TimerUpdate timerUpdate, CommittedResult result) { - TransformWatermarks completedTransform = transformToWatermarks.get(result.getTransform()); - // Newly pending elements must be added before completed elements are removed, as the two // do not share a Mutex within this call and thus can be interleaved with external calls to // refresh. @@ -861,6 +859,11 @@ public class InMemoryWatermarkManager { } } + TransformWatermarks completedTransform = transformToWatermarks.get(result.getTransform()); + if (input != null) { + // Add the unprocessed inputs + completedTransform.addPending(result.getUnprocessedInputs()); + } completedTransform.updateTimers(timerUpdate); if (input != null) { completedTransform.removePending(input); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0518fc68/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java index d4f891e..5c19287 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java @@ -152,7 +152,11 @@ class InProcessEvaluationContext { Iterable<? extends CommittedBundle<?>> committedBundles = commitBundles(result.getOutputBundles()); // Update watermarks and timers - CommittedResult committedResult = CommittedResult.create(result, committedBundles); + CommittedResult committedResult = CommittedResult.create(result, + completedBundle == null ? + null : + completedBundle.withElements((Iterable) result.getUnprocessedElements()), + committedBundles); watermarkManager.updateWatermarks( completedBundle, result.getTimerUpdate().withCompletedTimers(completedTimers), http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0518fc68/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessTransformResult.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessTransformResult.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessTransformResult.java index a132c33..0bc3ea1 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessTransformResult.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessTransformResult.java @@ -22,6 +22,7 @@ import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.common.CounterSet; import org.apache.beam.sdk.util.state.CopyOnAccessInMemoryStateInternals; @@ -45,6 +46,12 @@ public interface InProcessTransformResult { Iterable<? extends UncommittedBundle<?>> getOutputBundles(); /** + * Returns elements that were provided to the {@link TransformEvaluator} as input but were not + * processed. + */ + Iterable<? extends WindowedValue<?>> getUnprocessedElements(); + + /** * Returns the {@link CounterSet} used by this {@link PTransform}, or null if this transform did * not use a {@link CounterSet}. */ http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0518fc68/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java index 46e7d04..b2e3897 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java @@ -23,6 +23,7 @@ import org.apache.beam.runners.direct.InMemoryWatermarkManager.TimerUpdate; import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.common.CounterSet; import org.apache.beam.sdk.util.state.CopyOnAccessInMemoryStateInternals; @@ -41,6 +42,7 @@ import javax.annotation.Nullable; public class StepTransformResult implements InProcessTransformResult { private final AppliedPTransform<?, ?, ?> transform; private final Iterable<? extends UncommittedBundle<?>> bundles; + private final Iterable<? extends WindowedValue<?>> unprocessedElements; @Nullable private final CopyOnAccessInMemoryStateInternals<?> state; private final TimerUpdate timerUpdate; @Nullable private final CounterSet counters; @@ -49,12 +51,14 @@ public class StepTransformResult implements InProcessTransformResult { private StepTransformResult( AppliedPTransform<?, ?, ?> transform, Iterable<? extends UncommittedBundle<?>> outputBundles, + Iterable<? extends WindowedValue<?>> unprocessedElements, CopyOnAccessInMemoryStateInternals<?> state, TimerUpdate timerUpdate, CounterSet counters, Instant watermarkHold) { this.transform = checkNotNull(transform); this.bundles = checkNotNull(outputBundles); + this.unprocessedElements = checkNotNull(unprocessedElements); this.state = state; this.timerUpdate = checkNotNull(timerUpdate); this.counters = counters; @@ -67,6 +71,11 @@ public class StepTransformResult implements InProcessTransformResult { } @Override + public Iterable<? extends WindowedValue<?>> getUnprocessedElements() { + return unprocessedElements; + } + + @Override public CounterSet getCounters() { return counters; } @@ -113,6 +122,7 @@ public class StepTransformResult implements InProcessTransformResult { public static class Builder { private final AppliedPTransform<?, ?, ?> transform; private final ImmutableList.Builder<UncommittedBundle<?>> bundlesBuilder; + private final ImmutableList.Builder<WindowedValue<?>> unprocessedElementsBuilder; private CopyOnAccessInMemoryStateInternals<?> state; private TimerUpdate timerUpdate; private CounterSet counters; @@ -122,6 +132,7 @@ public class StepTransformResult implements InProcessTransformResult { this.transform = transform; this.watermarkHold = watermarkHold; this.bundlesBuilder = ImmutableList.builder(); + this.unprocessedElementsBuilder = ImmutableList.builder(); this.timerUpdate = TimerUpdate.builder(null).build(); } @@ -129,6 +140,7 @@ public class StepTransformResult implements InProcessTransformResult { return new StepTransformResult( transform, bundlesBuilder.build(), + unprocessedElementsBuilder.build(), state, timerUpdate, counters, @@ -150,6 +162,11 @@ public class StepTransformResult implements InProcessTransformResult { return this; } + public Builder addUnprocessedElements(Iterable<? extends WindowedValue<?>> unprocessed) { + unprocessedElementsBuilder.addAll(unprocessed); + return this; + } + public Builder addOutput( UncommittedBundle<?> outputBundle, UncommittedBundle<?>... outputBundles) { bundlesBuilder.add(outputBundle); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0518fc68/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java index b30e005..0d1b464 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java @@ -18,11 +18,14 @@ package org.apache.beam.runners.direct; +import static org.hamcrest.Matchers.nullValue; import static org.junit.Assert.assertThat; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.AppliedPTransform; +import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; @@ -46,6 +49,7 @@ import java.util.List; @RunWith(JUnit4.class) public class CommittedResultTest implements Serializable { private transient TestPipeline p = TestPipeline.create(); + private transient PCollection<Integer> created = p.apply(Create.of(1, 2)); private transient AppliedPTransform<?, ?, ?> transform = AppliedPTransform.of("foo", p.begin(), PDone.in(p), new PTransform<PBegin, PDone>() { }); @@ -55,12 +59,38 @@ public class CommittedResultTest implements Serializable { public void getTransformExtractsFromResult() { CommittedResult result = CommittedResult.create(StepTransformResult.withoutHold(transform).build(), + bundleFactory.createRootBundle(created).commit(Instant.now()), Collections.<InProcessPipelineRunner.CommittedBundle<?>>emptyList()); assertThat(result.getTransform(), Matchers.<AppliedPTransform<?, ?, ?>>equalTo(transform)); } @Test + public void getUncommittedElementsEqualInput() { + InProcessPipelineRunner.CommittedBundle<Integer> bundle = + bundleFactory.createRootBundle(created) + .add(WindowedValue.valueInGlobalWindow(2)) + .commit(Instant.now()); + CommittedResult result = + CommittedResult.create(StepTransformResult.withoutHold(transform).build(), + bundle, + Collections.<InProcessPipelineRunner.CommittedBundle<?>>emptyList()); + + assertThat(result.getUnprocessedInputs(), + Matchers.<InProcessPipelineRunner.CommittedBundle<?>>equalTo(bundle)); + } + + @Test + public void getUncommittedElementsNull() { + CommittedResult result = + CommittedResult.create(StepTransformResult.withoutHold(transform).build(), + null, + Collections.<InProcessPipelineRunner.CommittedBundle<?>>emptyList()); + + assertThat(result.getUnprocessedInputs(), nullValue()); + } + + @Test public void getOutputsEqualInput() { List<? extends InProcessPipelineRunner.CommittedBundle<?>> outputs = ImmutableList.of(bundleFactory.createRootBundle(PCollection.createPrimitiveOutputInternal(p, @@ -70,7 +100,9 @@ public class CommittedResultTest implements Serializable { WindowingStrategy.globalDefault(), PCollection.IsBounded.UNBOUNDED)).commit(Instant.now())); CommittedResult result = - CommittedResult.create(StepTransformResult.withoutHold(transform).build(), outputs); + CommittedResult.create(StepTransformResult.withoutHold(transform).build(), + bundleFactory.createRootBundle(created).commit(Instant.now()), + outputs); assertThat(result.getOutputs(), Matchers.containsInAnyOrder(outputs.toArray())); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0518fc68/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InMemoryWatermarkManagerTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InMemoryWatermarkManagerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InMemoryWatermarkManagerTest.java index 15cdf8a..b45440d 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InMemoryWatermarkManagerTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InMemoryWatermarkManagerTest.java @@ -71,6 +71,8 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; +import javax.annotation.Nullable; + /** * Tests for {@link InMemoryWatermarkManager}. */ @@ -162,6 +164,7 @@ public class InMemoryWatermarkManagerTest implements Serializable { manager.updateWatermarks(null, TimerUpdate.empty(), result(createdInts.getProducingTransformInternal(), + null, Collections.<CommittedBundle<?>>singleton(output)), new Instant(8000L)); TransformWatermarks updatedSourceWatermark = @@ -181,6 +184,7 @@ public class InMemoryWatermarkManagerTest implements Serializable { manager.updateWatermarks(null, TimerUpdate.empty(), result(intsToFlatten.getProducingTransformInternal(), + null, Collections.<CommittedBundle<?>>singleton(secondPcollectionBundle)), BoundedWindow.TIMESTAMP_MAX_VALUE); @@ -213,6 +217,7 @@ public class InMemoryWatermarkManagerTest implements Serializable { manager.updateWatermarks(secondPcollectionBundle, TimerUpdate.empty(), result(flattened.getProducingTransformInternal(), + secondPcollectionBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()), Collections.<CommittedBundle<?>>singleton(flattenedBundleSecondCreate)), null); TransformWatermarks transformAfterProcessing = @@ -220,6 +225,7 @@ public class InMemoryWatermarkManagerTest implements Serializable { manager.updateWatermarks(secondPcollectionBundle, TimerUpdate.empty(), result(flattened.getProducingTransformInternal(), + secondPcollectionBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()), Collections.<CommittedBundle<?>>singleton(flattenedBundleSecondCreate)), null); assertThat( @@ -237,6 +243,7 @@ public class InMemoryWatermarkManagerTest implements Serializable { manager.updateWatermarks(null, TimerUpdate.empty(), result(createdInts.getProducingTransformInternal(), + null, Collections.<CommittedBundle<?>>singleton(firstPcollectionBundle)), new Instant(Long.MAX_VALUE)); TransformWatermarks firstSourceWatermarks = @@ -267,6 +274,7 @@ public class InMemoryWatermarkManagerTest implements Serializable { manager.updateWatermarks(firstPcollectionBundle, TimerUpdate.empty(), result(flattened.getProducingTransformInternal(), + firstPcollectionBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()), Collections.<CommittedBundle<?>>singleton(completedFlattenBundle)), null); TransformWatermarks afterConsumingAllInput = @@ -291,6 +299,7 @@ public class InMemoryWatermarkManagerTest implements Serializable { manager.updateWatermarks(null, TimerUpdate.empty(), result(createdInts.getProducingTransformInternal(), + null, Collections.<CommittedBundle<?>>singleton(createdBundle)), new Instant(Long.MAX_VALUE)); TransformWatermarks createdAfterProducing = @@ -306,6 +315,7 @@ public class InMemoryWatermarkManagerTest implements Serializable { manager.updateWatermarks(createdBundle, TimerUpdate.empty(), result(keyed.getProducingTransformInternal(), + createdBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()), Collections.<CommittedBundle<?>>singleton(keyBundle)), null); TransformWatermarks keyedWatermarks = @@ -325,6 +335,7 @@ public class InMemoryWatermarkManagerTest implements Serializable { manager.updateWatermarks(createdBundle, TimerUpdate.empty(), result(filtered.getProducingTransformInternal(), + createdBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()), Collections.<CommittedBundle<?>>singleton(filteredBundle)), null); TransformWatermarks filteredProcessedWatermarks = @@ -350,6 +361,7 @@ public class InMemoryWatermarkManagerTest implements Serializable { manager.updateWatermarks(null, TimerUpdate.empty(), result(createdInts.getProducingTransformInternal(), + null, Collections.<CommittedBundle<?>>singleton(createdBundle)), new Instant(Long.MAX_VALUE)); @@ -360,6 +372,7 @@ public class InMemoryWatermarkManagerTest implements Serializable { manager.updateWatermarks(createdBundle, TimerUpdate.empty(), result(keyed.getProducingTransformInternal(), + createdBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()), Collections.<CommittedBundle<?>>singleton(keyBundle)), new Instant(500L)); TransformWatermarks keyedWatermarks = @@ -389,17 +402,20 @@ public class InMemoryWatermarkManagerTest implements Serializable { manager.updateWatermarks(null, TimerUpdate.empty(), result(createdInts.getProducingTransformInternal(), + null, ImmutableList.of(firstKeyBundle, secondKeyBundle)), BoundedWindow.TIMESTAMP_MAX_VALUE); manager.updateWatermarks(firstKeyBundle, TimerUpdate.empty(), result(filtered.getProducingTransformInternal(), + firstKeyBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()), Collections.<CommittedBundle<?>>emptyList()), new Instant(-1000L)); manager.updateWatermarks(secondKeyBundle, TimerUpdate.empty(), result(filtered.getProducingTransformInternal(), + secondKeyBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()), Collections.<CommittedBundle<?>>emptyList()), new Instant(1234L)); @@ -414,6 +430,7 @@ public class InMemoryWatermarkManagerTest implements Serializable { manager.updateWatermarks(fauxFirstKeyTimerBundle, TimerUpdate.empty(), result(filtered.getProducingTransformInternal(), + fauxFirstKeyTimerBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()), Collections.<CommittedBundle<?>>emptyList()), BoundedWindow.TIMESTAMP_MAX_VALUE); @@ -424,6 +441,7 @@ public class InMemoryWatermarkManagerTest implements Serializable { manager.updateWatermarks(fauxSecondKeyTimerBundle, TimerUpdate.empty(), result(filtered.getProducingTransformInternal(), + fauxSecondKeyTimerBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()), Collections.<CommittedBundle<?>>emptyList()), new Instant(5678L)); assertThat(filteredWatermarks.getOutputWatermark(), equalTo(new Instant(5678L))); @@ -431,6 +449,7 @@ public class InMemoryWatermarkManagerTest implements Serializable { manager.updateWatermarks(fauxSecondKeyTimerBundle, TimerUpdate.empty(), result(filtered.getProducingTransformInternal(), + fauxSecondKeyTimerBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()), Collections.<CommittedBundle<?>>emptyList()), BoundedWindow.TIMESTAMP_MAX_VALUE); assertThat(filteredWatermarks.getOutputWatermark(), @@ -447,6 +466,7 @@ public class InMemoryWatermarkManagerTest implements Serializable { bundleFactory.createRootBundle(createdInts).commit(BoundedWindow.TIMESTAMP_MAX_VALUE); manager.updateWatermarks(null, TimerUpdate.empty(), result(createdInts.getProducingTransformInternal(), + null, Collections.<CommittedBundle<?>>singleton(firstInput)), new Instant(0L)); TransformWatermarks firstWatermarks = @@ -458,6 +478,7 @@ public class InMemoryWatermarkManagerTest implements Serializable { manager.updateWatermarks(null, TimerUpdate.empty(), result(createdInts.getProducingTransformInternal(), + null, Collections.<CommittedBundle<?>>singleton(secondInput)), new Instant(-250L)); TransformWatermarks secondWatermarks = @@ -473,10 +494,12 @@ public class InMemoryWatermarkManagerTest implements Serializable { public void updateWatermarkWithHoldsShouldBeMonotonic() { CommittedBundle<Integer> createdBundle = timestampedBundle(createdInts, TimestampedValue.of(1, new Instant(1_000_000L)), - TimestampedValue.of(2, new Instant(1234L)), - TimestampedValue.of(3, new Instant(-1000L))); manager.updateWatermarks(null, + TimestampedValue.of(2, new Instant(1234L)), + TimestampedValue.of(3, new Instant(-1000L))); + manager.updateWatermarks(null, TimerUpdate.empty(), result(createdInts.getProducingTransformInternal(), + null, Collections.<CommittedBundle<?>>singleton(createdBundle)), new Instant(Long.MAX_VALUE)); @@ -487,6 +510,7 @@ public class InMemoryWatermarkManagerTest implements Serializable { manager.updateWatermarks(createdBundle, TimerUpdate.empty(), result(keyed.getProducingTransformInternal(), + createdBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()), Collections.<CommittedBundle<?>>singleton(keyBundle)), new Instant(500L)); TransformWatermarks keyedWatermarks = @@ -505,6 +529,40 @@ public class InMemoryWatermarkManagerTest implements Serializable { assertThat(updatedWatermarks.getOutputWatermark(), equalTo(oldOutputWatermark)); } + @Test + public void updateWatermarkWithUnprocessedElements() { + WindowedValue<Integer> first = WindowedValue.valueInGlobalWindow(1); + WindowedValue<Integer> second = + WindowedValue.timestampedValueInGlobalWindow(2, new Instant(-1000L)); + WindowedValue<Integer> third = + WindowedValue.timestampedValueInGlobalWindow(3, new Instant(1234L)); + CommittedBundle<Integer> createdBundle = bundleFactory.createRootBundle(createdInts) + .add(first) + .add(second) + .add(third) + .commit(clock.now()); + manager.updateWatermarks(null, + TimerUpdate.empty(), + result(createdInts.getProducingTransformInternal(), + null, + Collections.<CommittedBundle<?>>singleton(createdBundle)), + BoundedWindow.TIMESTAMP_MAX_VALUE); + + CommittedBundle<KV<String, Integer>> keyBundle = timestampedBundle(keyed, + TimestampedValue.of(KV.of("MyKey", 1), BoundedWindow.TIMESTAMP_MIN_VALUE)); + manager.updateWatermarks(createdBundle, + TimerUpdate.empty(), + result(keyed.getProducingTransformInternal(), + createdBundle.withElements(ImmutableList.of(second, third)), + Collections.<CommittedBundle<?>>singleton(keyBundle)), + BoundedWindow.TIMESTAMP_MAX_VALUE); + TransformWatermarks keyedWatermarks = + manager.getWatermarks(keyed.getProducingTransformInternal()); + // the unprocessed second and third are readded to pending + assertThat( + keyedWatermarks.getInputWatermark(), not(laterThan(new Instant(-1000L)))); + } + /** * Demonstrates that updateWatermarks in the presence of late data is monotonic. */ @@ -517,6 +575,7 @@ public class InMemoryWatermarkManagerTest implements Serializable { manager.updateWatermarks(null, TimerUpdate.empty(), result(createdInts.getProducingTransformInternal(), + null, Collections.<CommittedBundle<?>>singleton(createdBundle)), sourceWatermark); @@ -528,6 +587,7 @@ public class InMemoryWatermarkManagerTest implements Serializable { manager.updateWatermarks(createdBundle, TimerUpdate.empty(), result(keyed.getProducingTransformInternal(), + createdBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()), Collections.<CommittedBundle<?>>singleton(keyBundle)), null); TransformWatermarks onTimeWatermarks = @@ -542,6 +602,7 @@ public class InMemoryWatermarkManagerTest implements Serializable { manager.updateWatermarks(null, TimerUpdate.empty(), result(createdInts.getProducingTransformInternal(), + createdBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()), Collections.<CommittedBundle<?>>singleton(lateDataBundle)), new Instant(2_000_000L)); TransformWatermarks bufferedLateWm = @@ -560,6 +621,7 @@ public class InMemoryWatermarkManagerTest implements Serializable { manager.updateWatermarks(lateDataBundle, TimerUpdate.empty(), result(keyed.getProducingTransformInternal(), + lateDataBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()), Collections.<CommittedBundle<?>>singleton(lateKeyedBundle)), null); } @@ -567,8 +629,7 @@ public class InMemoryWatermarkManagerTest implements Serializable { public void updateWatermarkWithDifferentWindowedValueInstances() { manager.updateWatermarks( null, - TimerUpdate.empty(), - result(createdInts.getProducingTransformInternal(), + TimerUpdate.empty(), result(createdInts.getProducingTransformInternal(), null, Collections.<CommittedBundle<?>>singleton( bundleFactory .createRootBundle(createdInts) @@ -576,12 +637,14 @@ public class InMemoryWatermarkManagerTest implements Serializable { .commit(Instant.now()))), BoundedWindow.TIMESTAMP_MAX_VALUE); - manager.updateWatermarks( - bundleFactory.createRootBundle(createdInts) - .add(WindowedValue.valueInGlobalWindow(1)) - .commit(Instant.now()), + CommittedBundle<Integer> createdBundle = bundleFactory.createRootBundle(createdInts) + .add(WindowedValue.valueInGlobalWindow(1)) + .commit(Instant.now()); + manager.updateWatermarks(createdBundle, TimerUpdate.empty(), - result(keyed.getProducingTransformInternal(), Collections.<CommittedBundle<?>>emptyList()), + result(keyed.getProducingTransformInternal(), + createdBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()), + Collections.<CommittedBundle<?>>emptyList()), null); TransformWatermarks onTimeWatermarks = manager.getWatermarks(keyed.getProducingTransformInternal()); @@ -598,6 +661,7 @@ public class InMemoryWatermarkManagerTest implements Serializable { manager.updateWatermarks(null, TimerUpdate.empty(), result(createdInts.getProducingTransformInternal(), + null, Collections.<CommittedBundle<?>>singleton(emptyCreateOutput)), BoundedWindow.TIMESTAMP_MAX_VALUE); TransformWatermarks updatedSourceWatermarks = @@ -627,6 +691,7 @@ public class InMemoryWatermarkManagerTest implements Serializable { manager.updateWatermarks(null, TimerUpdate.empty(), result(createdInts.getProducingTransformInternal(), + null, Collections.<CommittedBundle<?>>singleton(firstCreateOutput)), new Instant(12_000L)); @@ -634,6 +699,7 @@ public class InMemoryWatermarkManagerTest implements Serializable { manager.updateWatermarks(firstCreateOutput, TimerUpdate.empty(), result(filtered.getProducingTransformInternal(), + firstCreateOutput.withElements(Collections.<WindowedValue<Integer>>emptyList()), Collections.<CommittedBundle<?>>singleton(firstFilterOutput)), new Instant(10_000L)); TransformWatermarks firstFilterWatermarks = @@ -645,6 +711,7 @@ public class InMemoryWatermarkManagerTest implements Serializable { manager.updateWatermarks(null, TimerUpdate.empty(), result(createdInts.getProducingTransformInternal(), + null, Collections.<CommittedBundle<?>>singleton(emptyCreateOutput)), BoundedWindow.TIMESTAMP_MAX_VALUE); TransformWatermarks updatedSourceWatermarks = @@ -687,6 +754,7 @@ public class InMemoryWatermarkManagerTest implements Serializable { manager.updateWatermarks(null, TimerUpdate.empty(), result(createdInts.getProducingTransformInternal(), + null, Collections.<CommittedBundle<?>>singleton(createOutput)), BoundedWindow.TIMESTAMP_MAX_VALUE); TransformWatermarks createAfterUpdate = @@ -716,6 +784,7 @@ public class InMemoryWatermarkManagerTest implements Serializable { manager.updateWatermarks(createOutput, TimerUpdate.empty(), result(filtered.getProducingTransformInternal(), + createOutput.withElements(Collections.<WindowedValue<Integer>>emptyList()), Collections.<CommittedBundle<?>>singleton(filterOutputBundle)), BoundedWindow.TIMESTAMP_MAX_VALUE); TransformWatermarks filterAfterConsumed = @@ -737,8 +806,10 @@ public class InMemoryWatermarkManagerTest implements Serializable { // @Test public void getSynchronizedProcessingTimeOutputHeldToPendingTimers() { CommittedBundle<Integer> createdBundle = multiWindowedBundle(createdInts, 1, 2, 4, 8); - manager.updateWatermarks(null, TimerUpdate.empty(), + manager.updateWatermarks(null, + TimerUpdate.empty(), result(createdInts.getProducingTransformInternal(), + null, Collections.<CommittedBundle<?>>singleton(createdBundle)), new Instant(1248L)); @@ -759,6 +830,7 @@ public class InMemoryWatermarkManagerTest implements Serializable { manager.updateWatermarks(createdBundle, timers, result(filtered.getProducingTransformInternal(), + createdBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()), Collections.<CommittedBundle<?>>singleton(filteredBundle)), BoundedWindow.TIMESTAMP_MAX_VALUE); Instant startTime = clock.now(); @@ -796,6 +868,7 @@ public class InMemoryWatermarkManagerTest implements Serializable { TimerUpdate.builder("key") .withCompletedTimers(Collections.<TimerData>singleton(pastTimer)).build(), result(filtered.getProducingTransformInternal(), + filteredTimerBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()), Collections.<CommittedBundle<?>>singleton(filteredTimerResult)), BoundedWindow.TIMESTAMP_MAX_VALUE); @@ -809,6 +882,7 @@ public class InMemoryWatermarkManagerTest implements Serializable { manager.updateWatermarks(filteredTimerResult, TimerUpdate.empty(), result(filteredTimesTwo.getProducingTransformInternal(), + filteredTimerResult.withElements(Collections.<WindowedValue<Integer>>emptyList()), Collections.<CommittedBundle<?>>emptyList()), BoundedWindow.TIMESTAMP_MAX_VALUE); assertThat(filteredDoubledWms.getSynchronizedProcessingOutputTime(), equalTo(clock.now())); @@ -846,6 +920,7 @@ public class InMemoryWatermarkManagerTest implements Serializable { manager.updateWatermarks(null, TimerUpdate.empty(), result(createdInts.getProducingTransformInternal(), + null, Collections.<CommittedBundle<?>>singleton(createOutput)), BoundedWindow.TIMESTAMP_MAX_VALUE); TransformWatermarks createAfterUpdate = @@ -859,6 +934,7 @@ public class InMemoryWatermarkManagerTest implements Serializable { manager.updateWatermarks(null, TimerUpdate.empty(), result(createdInts.getProducingTransformInternal(), + null, Collections.<CommittedBundle<?>>singleton(createSecondOutput)), BoundedWindow.TIMESTAMP_MAX_VALUE); @@ -871,6 +947,7 @@ public class InMemoryWatermarkManagerTest implements Serializable { manager.updateWatermarks(null, TimerUpdate.empty(), result(createdInts.getProducingTransformInternal(), + null, Collections.<CommittedBundle<?>>singleton(created)), new Instant(40_900L)); @@ -881,6 +958,7 @@ public class InMemoryWatermarkManagerTest implements Serializable { manager.updateWatermarks(created, TimerUpdate.builder("key").setTimer(upstreamProcessingTimer).build(), result(filtered.getProducingTransformInternal(), + created.withElements(Collections.<WindowedValue<Integer>>emptyList()), Collections.<CommittedBundle<?>>singleton(filteredBundle)), BoundedWindow.TIMESTAMP_MAX_VALUE); @@ -901,6 +979,7 @@ public class InMemoryWatermarkManagerTest implements Serializable { TimerUpdate.builder("key") .withCompletedTimers(Collections.singleton(upstreamProcessingTimer)).build(), result(filtered.getProducingTransformInternal(), + otherCreated.withElements(Collections.<WindowedValue<Integer>>emptyList()), Collections.<CommittedBundle<?>>emptyList()), BoundedWindow.TIMESTAMP_MAX_VALUE); @@ -914,6 +993,7 @@ public class InMemoryWatermarkManagerTest implements Serializable { null, TimerUpdate.empty(), result(createdInts.getProducingTransformInternal(), + null, Collections.<CommittedBundle<?>>singleton(created)), new Instant(29_919_235L)); @@ -924,6 +1004,7 @@ public class InMemoryWatermarkManagerTest implements Serializable { created, TimerUpdate.empty(), result(filtered.getProducingTransformInternal(), + created.withElements(Collections.<WindowedValue<Integer>>emptyList()), Collections.<CommittedBundle<?>>singleton(filteredBundle)), BoundedWindow.TIMESTAMP_MAX_VALUE); @@ -946,7 +1027,9 @@ public class InMemoryWatermarkManagerTest implements Serializable { CommittedBundle<Integer> createdBundle = multiWindowedBundle(filtered); manager.updateWatermarks(null, TimerUpdate.empty(), - result(createdInts.getProducingTransformInternal(), Collections.singleton(createdBundle)), + result(createdInts.getProducingTransformInternal(), + null, + Collections.singleton(createdBundle)), new Instant(1500L)); TimerData earliestTimer = @@ -966,6 +1049,7 @@ public class InMemoryWatermarkManagerTest implements Serializable { manager.updateWatermarks(createdBundle, update, result(filtered.getProducingTransformInternal(), + createdBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()), Collections.<CommittedBundle<?>>singleton(multiWindowedBundle(intsToFlatten))), new Instant(1000L)); @@ -982,6 +1066,7 @@ public class InMemoryWatermarkManagerTest implements Serializable { manager.updateWatermarks(null, TimerUpdate.empty(), result(createdInts.getProducingTransformInternal(), + null, Collections.<CommittedBundle<?>>emptyList()), new Instant(50_000L)); Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> secondTransformFiredTimers = @@ -1007,7 +1092,9 @@ public class InMemoryWatermarkManagerTest implements Serializable { CommittedBundle<Integer> createdBundle = multiWindowedBundle(filtered); manager.updateWatermarks(null, TimerUpdate.empty(), - result(createdInts.getProducingTransformInternal(), Collections.singleton(createdBundle)), + result(createdInts.getProducingTransformInternal(), + null, + Collections.singleton(createdBundle)), new Instant(1500L)); TimerData earliestTimer = @@ -1028,7 +1115,8 @@ public class InMemoryWatermarkManagerTest implements Serializable { createdBundle, update, result(filtered.getProducingTransformInternal(), - Collections.<CommittedBundle<?>>singleton(multiWindowedBundle(intsToFlatten))), + createdBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()), + Collections.<CommittedBundle<?>>singleton(multiWindowedBundle(intsToFlatten))), new Instant(1000L)); Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> firstTransformFiredTimers = @@ -1045,6 +1133,7 @@ public class InMemoryWatermarkManagerTest implements Serializable { manager.updateWatermarks(null, TimerUpdate.empty(), result(createdInts.getProducingTransformInternal(), + null, Collections.<CommittedBundle<?>>emptyList()), new Instant(50_000L)); Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> secondTransformFiredTimers = @@ -1070,7 +1159,9 @@ public class InMemoryWatermarkManagerTest implements Serializable { CommittedBundle<Integer> createdBundle = multiWindowedBundle(filtered); manager.updateWatermarks(null, TimerUpdate.empty(), - result(createdInts.getProducingTransformInternal(), Collections.singleton(createdBundle)), + result(createdInts.getProducingTransformInternal(), + null, + Collections.singleton(createdBundle)), new Instant(1500L)); TimerData earliestTimer = TimerData.of( @@ -1091,6 +1182,7 @@ public class InMemoryWatermarkManagerTest implements Serializable { createdBundle, update, result(filtered.getProducingTransformInternal(), + createdBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()), Collections.<CommittedBundle<?>>singleton(multiWindowedBundle(intsToFlatten))), new Instant(1000L)); @@ -1109,6 +1201,7 @@ public class InMemoryWatermarkManagerTest implements Serializable { manager.updateWatermarks(null, TimerUpdate.empty(), result(createdInts.getProducingTransformInternal(), + null, Collections.<CommittedBundle<?>>emptyList()), new Instant(50_000L)); Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> secondTransformFiredTimers = @@ -1273,8 +1366,10 @@ public class InMemoryWatermarkManagerTest implements Serializable { private final CommittedResult result( AppliedPTransform<?, ?, ?> transform, + @Nullable CommittedBundle<?> unprocessedBundle, Iterable<? extends CommittedBundle<?>> bundles) { - return CommittedResult.create(StepTransformResult.withoutHold(transform) - .build(), bundles); + return CommittedResult.create(StepTransformResult.withoutHold(transform).build(), + unprocessedBundle, + bundles); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0518fc68/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java index 959e9d3..8b6053e 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java @@ -491,7 +491,16 @@ public class TransformExecutorTest { CommittedBundle<?> inputBundle, InProcessTransformResult result) { handledResult = result; onMethod.countDown(); - return CommittedResult.create(result, Collections.<CommittedBundle<?>>emptyList()); + @SuppressWarnings("rawtypes") Iterable unprocessedElements = + result.getUnprocessedElements() == null ? + Collections.emptyList() : + result.getUnprocessedElements(); + + CommittedBundle<?> unprocessedBundle = + inputBundle == null ? null : inputBundle.withElements(unprocessedElements); + return CommittedResult.create(result, + unprocessedBundle, + Collections.<CommittedBundle<?>>emptyList()); } @Override