Cache read SideInput Contents in the InProcessSideInputContainer This ensures that while processing a bundle all elements see the same contents for any SideInput Window.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/49689fce Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/49689fce Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/49689fce Branch: refs/heads/master Commit: 49689fced52e29d7efd386202265d0a105fab276 Parents: a25fd05 Author: Thomas Groh <tg...@google.com> Authored: Tue May 10 13:22:20 2016 -0700 Committer: Thomas Groh <tg...@google.com> Committed: Wed May 11 13:01:26 2016 -0700 ---------------------------------------------------------------------- .../direct/InProcessSideInputContainer.java | 33 +++++++++++++++----- .../direct/InProcessEvaluationContextTest.java | 3 ++ .../direct/InProcessSideInputContainerTest.java | 15 +++++++-- 3 files changed, 42 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/49689fce/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessSideInputContainer.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessSideInputContainer.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessSideInputContainer.java index 1ef8f13..96a9ad2 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessSideInputContainer.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessSideInputContainer.java @@ -29,6 +29,7 @@ import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.PCollectionView; import com.google.common.base.MoreObjects; +import com.google.common.base.Optional; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; @@ -211,9 +212,13 @@ class InProcessSideInputContainer { private final class SideInputContainerSideInputReader implements ReadyCheckingSideInputReader { private final Collection<PCollectionView<?>> readerViews; + private final LoadingCache< + PCollectionViewWindow<?>, Optional<? extends Iterable<? extends WindowedValue<?>>>> + viewContents; private SideInputContainerSideInputReader(Collection<PCollectionView<?>> readerViews) { this.readerViews = ImmutableSet.copyOf(readerViews); + this.viewContents = CacheBuilder.newBuilder().build(new CurrentViewContentsLoader()); } @Override @@ -224,22 +229,23 @@ class InProcessSideInputContainer { + "Contained views; %s", view, readerViews); - return viewByWindows.getUnchecked(PCollectionViewWindow.of(view, window)).get() != null; + return viewContents.getUnchecked(PCollectionViewWindow.of(view, window)).isPresent(); } @Override @Nullable public <T> T get(final PCollectionView<T> view, final BoundedWindow window) { - checkArgument(readerViews.contains(view), - "calling get(PCollectionView) with unknown view: " + view); - checkArgument(isReady(view, window), - "calling get(PCollectionView) with view %s that is not ready in window %s", + checkArgument( + readerViews.contains(view), "calling get(PCollectionView) with unknown view: " + view); + checkArgument( + isReady(view, window), + "calling get() on a PCollectionView %s that is not ready in window %s", view, window); // Safe covariant cast @SuppressWarnings("unchecked") Iterable<WindowedValue<?>> values = - (Iterable<WindowedValue<?>>) viewByWindows - .getUnchecked(PCollectionViewWindow.of(view, window)).get(); + (Iterable<WindowedValue<?>>) viewContents.getUnchecked(PCollectionViewWindow.of(view, + window)).get(); return view.fromIterableInternal(values); } @@ -254,4 +260,17 @@ class InProcessSideInputContainer { } } + /** + * A {@link CacheLoader} that loads the current contents of a {@link PCollectionViewWindow} into + * an optional. + */ + private class CurrentViewContentsLoader extends CacheLoader< + PCollectionViewWindow<?>, Optional<? extends Iterable<? extends WindowedValue<?>>>> { + + @Override + public Optional<? extends Iterable<? extends WindowedValue<?>>> + load(PCollectionViewWindow<?> key) { + return Optional.fromNullable(viewByWindows.getUnchecked(key).get()); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/49689fce/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessEvaluationContextTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessEvaluationContextTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessEvaluationContextTest.java index b73e41a..10b8721 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessEvaluationContextTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessEvaluationContextTest.java @@ -150,6 +150,9 @@ public class InProcessEvaluationContextTest { WindowedValue.of( 4444, new Instant(8677L), second, PaneInfo.createPane(false, true, Timing.LATE, 1, 1)); viewWriter.add(Collections.singleton(overrittenSecondValue)); + assertThat(reader.get(view, second), containsInAnyOrder(2)); + // The cached value is served in the earlier reader + reader = context.createSideInputReader(ImmutableList.<PCollectionView<?>>of(view)); assertThat(reader.get(view, second), containsInAnyOrder(4444)); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/49689fce/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessSideInputContainerTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessSideInputContainerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessSideInputContainerTest.java index 2f376dd..746c0f8 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessSideInputContainerTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessSideInputContainerTest.java @@ -399,7 +399,8 @@ public class InProcessSideInputContainerTest { FIRST_WINDOW.maxTimestamp().minus(100L), FIRST_WINDOW, PaneInfo.ON_TIME_AND_ONLY_FIRING))); - assertThat(reader.isReady(mapView, FIRST_WINDOW), is(true)); + // Cached value is false + assertThat(reader.isReady(mapView, FIRST_WINDOW), is(false)); container.write( singletonView, @@ -410,10 +411,15 @@ public class InProcessSideInputContainerTest { SECOND_WINDOW, PaneInfo.ON_TIME_AND_ONLY_FIRING))); assertThat(reader.isReady(mapView, SECOND_WINDOW), is(true)); - assertThat(reader.isReady(singletonView, SECOND_WINDOW), is(true)); + assertThat(reader.isReady(singletonView, SECOND_WINDOW), is(false)); assertThat(reader.isReady(mapView, GlobalWindow.INSTANCE), is(false)); assertThat(reader.isReady(singletonView, GlobalWindow.INSTANCE), is(false)); + + reader = container.createReaderForViews(ImmutableList.of(mapView, singletonView)); + assertThat(reader.isReady(mapView, SECOND_WINDOW), is(true)); + assertThat(reader.isReady(singletonView, SECOND_WINDOW), is(true)); + assertThat(reader.isReady(mapView, FIRST_WINDOW), is(true)); } @Test @@ -431,6 +437,11 @@ public class InProcessSideInputContainerTest { if (!onComplete.await(1500L, TimeUnit.MILLISECONDS)) { fail("Callback to set empty values did not complete!"); } + // The cached value was false, so it continues to be true + assertThat(reader.isReady(singletonView, GlobalWindow.INSTANCE), is(false)); + + // A new reader for the same container gets a fresh look + reader = container.createReaderForViews(ImmutableList.of(mapView, singletonView)); assertThat(reader.isReady(singletonView, GlobalWindow.INSTANCE), is(true)); }