Port ViewOverrideFactory to SDK-agnostic APIs
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8c5b57ea Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8c5b57ea Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8c5b57ea Branch: refs/heads/gearpump-runner Commit: 8c5b57ea8445cd50a35c6dffb460dcf0f426e700 Parents: b4c7716 Author: Kenneth Knowles <k...@google.com> Authored: Fri May 26 14:26:55 2017 -0700 Committer: Kenneth Knowles <k...@google.com> Committed: Fri Jun 9 19:56:52 2017 -0700 ---------------------------------------------------------------------- .../CreatePCollectionViewTranslation.java | 4 +- .../runners/direct/ViewOverrideFactory.java | 48 ++++++++++++-------- .../direct/ViewEvaluatorFactoryTest.java | 3 +- .../runners/direct/ViewOverrideFactoryTest.java | 23 ++++++++-- .../beam/sdk/values/PCollectionViews.java | 10 ++++ 5 files changed, 62 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/8c5b57ea/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslation.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslation.java index aa24909..8fc99b9 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslation.java @@ -56,8 +56,8 @@ public class CreatePCollectionViewTranslation { @Deprecated public static <ElemT, ViewT> PCollectionView<ViewT> getView( AppliedPTransform< - PCollection<ElemT>, PCollectionView<ViewT>, - PTransform<PCollection<ElemT>, PCollectionView<ViewT>>> + PCollection<ElemT>, PCollection<ElemT>, + PTransform<PCollection<ElemT>, PCollection<ElemT>>> application) throws IOException { http://git-wip-us.apache.org/repos/asf/beam/blob/8c5b57ea/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java index 06a7388..5dcf016 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java @@ -18,8 +18,9 @@ package org.apache.beam.runners.direct; +import java.io.IOException; import java.util.Map; -import org.apache.beam.runners.core.construction.ForwardingPTransform; +import org.apache.beam.runners.core.construction.CreatePCollectionViewTranslation; import org.apache.beam.runners.core.construction.PTransformReplacements; import org.apache.beam.runners.core.construction.PTransformTranslation.RawPTransform; import org.apache.beam.runners.core.construction.ReplacementOutputs; @@ -43,16 +44,30 @@ import org.apache.beam.sdk.values.TupleTag; */ class ViewOverrideFactory<ElemT, ViewT> implements PTransformOverrideFactory< - PCollection<ElemT>, PCollection<ElemT>, CreatePCollectionView<ElemT, ViewT>> { + PCollection<ElemT>, PCollection<ElemT>, + PTransform<PCollection<ElemT>, PCollection<ElemT>>> { @Override public PTransformReplacement<PCollection<ElemT>, PCollection<ElemT>> getReplacementTransform( AppliedPTransform< - PCollection<ElemT>, PCollection<ElemT>, CreatePCollectionView<ElemT, ViewT>> + PCollection<ElemT>, PCollection<ElemT>, + PTransform<PCollection<ElemT>, PCollection<ElemT>>> transform) { - return PTransformReplacement.of( + + PCollectionView<ViewT> view; + try { + view = CreatePCollectionViewTranslation.getView(transform); + } catch (IOException exc) { + throw new RuntimeException( + String.format( + "Could not extract %s from transform %s", + PCollectionView.class.getSimpleName(), transform), + exc); + } + + return PTransformReplacement.of( PTransformReplacements.getSingletonMainInput(transform), - new GroupAndWriteView<>(transform.getTransform())); + new GroupAndWriteView<ElemT, ViewT>(view)); } @Override @@ -63,11 +78,11 @@ class ViewOverrideFactory<ElemT, ViewT> /** The {@link DirectRunner} composite override for {@link CreatePCollectionView}. */ static class GroupAndWriteView<ElemT, ViewT> - extends ForwardingPTransform<PCollection<ElemT>, PCollection<ElemT>> { - private final CreatePCollectionView<ElemT, ViewT> og; + extends PTransform<PCollection<ElemT>, PCollection<ElemT>> { + private final PCollectionView<ViewT> view; - private GroupAndWriteView(CreatePCollectionView<ElemT, ViewT> og) { - this.og = og; + private GroupAndWriteView(PCollectionView<ViewT> view) { + this.view = view; } @Override @@ -77,14 +92,9 @@ class ViewOverrideFactory<ElemT, ViewT> .setCoder(KvCoder.of(VoidCoder.of(), input.getCoder())) .apply(GroupByKey.<Void, ElemT>create()) .apply(Values.<Iterable<ElemT>>create()) - .apply(new WriteView<ElemT, ViewT>(og)); + .apply(new WriteView<ElemT, ViewT>(view)); return input; } - - @Override - protected PTransform<PCollection<ElemT>, PCollection<ElemT>> delegate() { - return og; - } } /** @@ -96,10 +106,10 @@ class ViewOverrideFactory<ElemT, ViewT> */ static final class WriteView<ElemT, ViewT> extends RawPTransform<PCollection<Iterable<ElemT>>, PCollection<Iterable<ElemT>>> { - private final CreatePCollectionView<ElemT, ViewT> og; + private final PCollectionView<ViewT> view; - WriteView(CreatePCollectionView<ElemT, ViewT> og) { - this.og = og; + WriteView(PCollectionView<ViewT> view) { + this.view = view; } @Override @@ -112,7 +122,7 @@ class ViewOverrideFactory<ElemT, ViewT> @SuppressWarnings("deprecation") public PCollectionView<ViewT> getView() { - return og.getView(); + return view; } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/8c5b57ea/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java index ad1aecc..5bc48b7 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java @@ -66,7 +66,8 @@ public class ViewEvaluatorFactoryTest { .apply(GroupByKey.<Void, String>create()) .apply(Values.<Iterable<String>>create()); PCollection<Iterable<String>> view = - concat.apply(new ViewOverrideFactory.WriteView<>(createView)); + concat.apply( + new ViewOverrideFactory.WriteView<String, Iterable<String>>(createView.getView())); EvaluationContext context = mock(EvaluationContext.class); TestViewWriter<String, Iterable<String>> viewWriter = new TestViewWriter<>(); http://git-wip-us.apache.org/repos/asf/beam/blob/8c5b57ea/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java index 94728c7..6af9273 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java @@ -18,6 +18,7 @@ package org.apache.beam.runners.direct; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertThat; @@ -36,8 +37,11 @@ import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.View.CreatePCollectionView; +import org.apache.beam.sdk.transforms.ViewFn; +import org.apache.beam.sdk.transforms.windowing.WindowMappingFn; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.PCollectionViews; @@ -67,7 +71,7 @@ public class ViewOverrideFactoryTest implements Serializable { factory.getReplacementTransform( AppliedPTransform .<PCollection<Integer>, PCollection<Integer>, - CreatePCollectionView<Integer, List<Integer>>> + PTransform<PCollection<Integer>, PCollection<Integer>>> of( "foo", ints.expand(), @@ -102,7 +106,7 @@ public class ViewOverrideFactoryTest implements Serializable { factory.getReplacementTransform( AppliedPTransform .<PCollection<Integer>, PCollection<Integer>, - CreatePCollectionView<Integer, List<Integer>>> + PTransform<PCollection<Integer>, PCollection<Integer>>> of( "foo", ints.expand(), @@ -120,8 +124,19 @@ public class ViewOverrideFactoryTest implements Serializable { "There should only be one WriteView primitive in the graph", writeViewVisited.getAndSet(true), is(false)); - PCollectionView replacementView = ((WriteView) node.getTransform()).getView(); - assertThat(replacementView, Matchers.<PCollectionView>theInstance(view)); + PCollectionView<?> replacementView = ((WriteView) node.getTransform()).getView(); + + // replacementView.getPCollection() is null, but that is not a requirement + // so not asserted one way or the other + assertThat( + replacementView.getTagInternal(), + equalTo(view.getTagInternal())); + assertThat( + replacementView.getViewFn(), + Matchers.<ViewFn<?, ?>>equalTo(view.getViewFn())); + assertThat( + replacementView.getWindowMappingFn(), + Matchers.<WindowMappingFn<?>>equalTo(view.getWindowMappingFn())); assertThat(node.getInputs().entrySet(), hasSize(1)); } } http://git-wip-us.apache.org/repos/asf/beam/blob/8c5b57ea/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java index 5e2e2c3..0c04370 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java @@ -282,6 +282,16 @@ public class PCollectionViews { } })); } + + @Override + public boolean equals(Object other) { + return other instanceof ListViewFn; + } + + @Override + public int hashCode() { + return ListViewFn.class.hashCode(); + } } /**