Dehydrate then rehydrate Pipeline before DirectRunner.run()
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8ca45915 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8ca45915 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8ca45915 Branch: refs/heads/master Commit: 8ca459158888693839edb14f824fa6835ebe3e67 Parents: 4348159 Author: Kenneth Knowles <k...@google.com> Authored: Fri May 26 11:23:05 2017 -0700 Committer: Kenneth Knowles <k...@google.com> Committed: Mon Jul 24 18:53:26 2017 -0700 ---------------------------------------------------------------------- .../beam/runners/direct/DirectRunner.java | 11 +++++- .../runners/direct/ViewOverrideFactoryTest.java | 41 -------------------- 2 files changed, 10 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/8ca45915/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java index 4621224..c5f29e5 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java @@ -22,6 +22,7 @@ import com.google.common.base.Supplier; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import java.io.IOException; import java.util.Collection; import java.util.Collections; import java.util.EnumSet; @@ -31,6 +32,7 @@ import java.util.Set; import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems; import org.apache.beam.runners.core.construction.PTransformMatchers; import org.apache.beam.runners.core.construction.PTransformTranslation; +import org.apache.beam.runners.core.construction.PipelineTranslation; import org.apache.beam.runners.core.construction.SplittableParDo; import org.apache.beam.runners.direct.DirectRunner.DirectPipelineResult; import org.apache.beam.runners.direct.TestStreamEvaluatorFactory.DirectTestStreamFactory; @@ -156,7 +158,14 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult> { } @Override - public DirectPipelineResult run(Pipeline pipeline) { + public DirectPipelineResult run(Pipeline originalPipeline) { + Pipeline pipeline; + try { + pipeline = PipelineTranslation.fromProto( + PipelineTranslation.toProto(originalPipeline)); + } catch (IOException exception) { + throw new RuntimeException("Error preparing pipeline for direct execution.", exception); + } pipeline.replaceAll(defaultTransformOverrides()); MetricsEnvironment.setMetricsSupported(true); DirectGraphVisitor graphVisitor = new DirectGraphVisitor(); http://git-wip-us.apache.org/repos/asf/beam/blob/8ca45915/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java index 6af9273..94d8d70 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java @@ -23,22 +23,17 @@ import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertThat; -import com.google.common.collect.ImmutableSet; import java.io.Serializable; import java.util.List; -import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.beam.runners.direct.ViewOverrideFactory.WriteView; import org.apache.beam.sdk.Pipeline.PipelineVisitor; import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.runners.PTransformOverrideFactory.PTransformReplacement; import org.apache.beam.sdk.runners.TransformHierarchy.Node; -import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.View.CreatePCollectionView; import org.apache.beam.sdk.transforms.ViewFn; import org.apache.beam.sdk.transforms.windowing.WindowMappingFn; @@ -62,42 +57,6 @@ public class ViewOverrideFactoryTest implements Serializable { new ViewOverrideFactory<>(); @Test - public void replacementSucceeds() { - PCollection<Integer> ints = p.apply("CreateContents", Create.of(1, 2, 3)); - final PCollectionView<List<Integer>> view = - PCollectionViews.listView(ints, WindowingStrategy.globalDefault(), ints.getCoder()); - PTransformReplacement<PCollection<Integer>, PCollection<Integer>> - replacementTransform = - factory.getReplacementTransform( - AppliedPTransform - .<PCollection<Integer>, PCollection<Integer>, - PTransform<PCollection<Integer>, PCollection<Integer>>> - of( - "foo", - ints.expand(), - view.expand(), - CreatePCollectionView.<Integer, List<Integer>>of(view), - p)); - ints.apply(replacementTransform.getTransform()); - - PCollection<Set<Integer>> outputViewContents = - p.apply("CreateSingleton", Create.of(0)) - .apply( - "OutputContents", - ParDo.of( - new DoFn<Integer, Set<Integer>>() { - @ProcessElement - public void outputSideInput(ProcessContext context) { - context.output(ImmutableSet.copyOf(context.sideInput(view))); - } - }) - .withSideInputs(view)); - PAssert.thatSingleton(outputViewContents).isEqualTo(ImmutableSet.of(1, 2, 3)); - - p.run(); - } - - @Test public void replacementGetViewReturnsOriginal() { final PCollection<Integer> ints = p.apply("CreateContents", Create.of(1, 2, 3)); final PCollectionView<List<Integer>> view =