Add Pipeline rehydration from proto
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/43481595 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/43481595 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/43481595 Branch: refs/heads/master Commit: 43481595ebc854f4a7188609fd53267497e68124 Parents: 12c277f Author: Kenneth Knowles <k...@google.com> Authored: Fri May 26 11:22:50 2017 -0700 Committer: Kenneth Knowles <k...@google.com> Committed: Mon Jul 24 18:53:26 2017 -0700 ---------------------------------------------------------------------- .../construction/PTransformTranslation.java | 8 + .../core/construction/PipelineTranslation.java | 280 +++++++++++++++++++ .../core/construction/RehydratedComponents.java | 3 +- .../core/construction/SdkComponents.java | 52 ---- .../construction/PipelineTranslationTest.java | 199 +++++++++++++ .../core/construction/SdkComponentsTest.java | 107 ------- .../src/main/proto/beam_runner_api.proto | 4 +- .../main/java/org/apache/beam/sdk/Pipeline.java | 15 +- .../beam/sdk/runners/TransformHierarchy.java | 69 +++++ 9 files changed, 574 insertions(+), 163 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/43481595/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java index d459645..b8365c9 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java @@ -92,6 +92,7 @@ public class PTransformTranslation { List<AppliedPTransform<?, ?, ?>> subtransforms, SdkComponents components) throws IOException { + // TODO include DisplayData https://issues.apache.org/jira/browse/BEAM-2645 RunnerApi.PTransform.Builder transformBuilder = RunnerApi.PTransform.newBuilder(); for (Map.Entry<TupleTag<?>, PValue> taggedInput : appliedPTransform.getInputs().entrySet()) { checkArgument( @@ -136,6 +137,7 @@ public class PTransformTranslation { } transformBuilder.setSpec(payload); } + rawPTransform.registerComponents(components); } else if (KNOWN_PAYLOAD_TRANSLATORS.containsKey(transform.getClass())) { FunctionSpec payload = KNOWN_PAYLOAD_TRANSLATORS @@ -225,6 +227,8 @@ public class PTransformTranslation { public Any getPayload() { return null; } + + public void registerComponents(SdkComponents components) {} } /** @@ -255,6 +259,10 @@ public class PTransformTranslation { transformSpec.setParameter(payload); } + // Transforms like Combine may have Coders that need to be added but do not + // occur in a black-box traversal + transform.getTransform().registerComponents(components); + return transformSpec.build(); } } http://git-wip-us.apache.org/repos/asf/beam/blob/43481595/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java new file mode 100644 index 0000000..9e4839a --- /dev/null +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.core.construction; + +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.auto.value.AutoValue; +import com.google.common.base.MoreObjects; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.ListMultimap; +import com.google.protobuf.Any; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.beam.runners.core.construction.PTransformTranslation.RawPTransform; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.Pipeline.PipelineVisitor; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.common.runner.v1.RunnerApi; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.runners.AppliedPTransform; +import org.apache.beam.sdk.runners.TransformHierarchy; +import org.apache.beam.sdk.runners.TransformHierarchy.Node; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.display.HasDisplayData; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.PCollectionViews; +import org.apache.beam.sdk.values.PInput; +import org.apache.beam.sdk.values.POutput; +import org.apache.beam.sdk.values.PValue; +import org.apache.beam.sdk.values.TupleTag; + +/** Utilities for going to/from Runner API pipelines. */ +public class PipelineTranslation { + + public static RunnerApi.Pipeline toProto(final Pipeline pipeline) { + final SdkComponents components = SdkComponents.create(); + final Collection<String> rootIds = new HashSet<>(); + pipeline.traverseTopologically( + new PipelineVisitor.Defaults() { + private final ListMultimap<Node, AppliedPTransform<?, ?, ?>> children = + ArrayListMultimap.create(); + + @Override + public void leaveCompositeTransform(Node node) { + if (node.isRootNode()) { + for (AppliedPTransform<?, ?, ?> pipelineRoot : children.get(node)) { + rootIds.add(components.getExistingPTransformId(pipelineRoot)); + } + } else { + // TODO: Include DisplayData in the proto + children.put(node.getEnclosingNode(), node.toAppliedPTransform(pipeline)); + try { + components.registerPTransform( + node.toAppliedPTransform(pipeline), children.get(node)); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + + @Override + public void visitPrimitiveTransform(Node node) { + // TODO: Include DisplayData in the proto + children.put(node.getEnclosingNode(), node.toAppliedPTransform(pipeline)); + try { + components.registerPTransform( + node.toAppliedPTransform(pipeline), + Collections.<AppliedPTransform<?, ?, ?>>emptyList()); + } catch (IOException e) { + throw new IllegalStateException(e); + } + } + }); + return RunnerApi.Pipeline.newBuilder() + .setComponents(components.toComponents()) + .addAllRootTransformIds(rootIds) + .build(); + } + + private static DisplayData evaluateDisplayData(HasDisplayData component) { + return DisplayData.from(component); + } + + public static Pipeline fromProto(final RunnerApi.Pipeline pipelineProto) + throws IOException { + TransformHierarchy transforms = new TransformHierarchy(); + Pipeline pipeline = Pipeline.forTransformHierarchy(transforms, PipelineOptionsFactory.create()); + + // Keeping the PCollections straight is a semantic necessity, but being careful not to explode + // the number of coders and windowing strategies is also nice, and helps testing. + RehydratedComponents rehydratedComponents = + RehydratedComponents.forComponents(pipelineProto.getComponents()).withPipeline(pipeline); + + for (String rootId : pipelineProto.getRootTransformIdsList()) { + addRehydratedTransform( + transforms, + pipelineProto.getComponents().getTransformsOrThrow(rootId), + pipeline, + pipelineProto.getComponents().getTransformsMap(), + rehydratedComponents); + } + + return pipeline; + } + + private static void addRehydratedTransform( + TransformHierarchy transforms, + RunnerApi.PTransform transformProto, + Pipeline pipeline, + Map<String, RunnerApi.PTransform> transformProtos, + RehydratedComponents rehydratedComponents) + throws IOException { + + Map<TupleTag<?>, PValue> rehydratedInputs = new HashMap<>(); + for (Map.Entry<String, String> inputEntry : transformProto.getInputsMap().entrySet()) { + rehydratedInputs.put( + new TupleTag<>(inputEntry.getKey()), + rehydratedComponents.getPCollection(inputEntry.getValue())); + } + + Map<TupleTag<?>, PValue> rehydratedOutputs = new HashMap<>(); + for (Map.Entry<String, String> outputEntry : transformProto.getOutputsMap().entrySet()) { + rehydratedOutputs.put( + new TupleTag<>(outputEntry.getKey()), + rehydratedComponents.getPCollection(outputEntry.getValue())); + } + + RunnerApi.FunctionSpec transformSpec = transformProto.getSpec(); + + // By default, no "additional" inputs, since that is an SDK-specific thing. + // Only ParDo really separates main from side inputs + Map<TupleTag<?>, PValue> additionalInputs = Collections.emptyMap(); + + // TODO: ParDoTranslator should own it - https://issues.apache.org/jira/browse/BEAM-2674 + if (transformSpec.getUrn().equals(PTransformTranslation.PAR_DO_TRANSFORM_URN)) { + RunnerApi.ParDoPayload payload = + transformSpec.getParameter().unpack(RunnerApi.ParDoPayload.class); + + List<PCollectionView<?>> views = new ArrayList<>(); + for (Map.Entry<String, RunnerApi.SideInput> sideInputEntry : + payload.getSideInputsMap().entrySet()) { + String localName = sideInputEntry.getKey(); + RunnerApi.SideInput sideInput = sideInputEntry.getValue(); + PCollection<?> pCollection = + (PCollection<?>) checkNotNull(rehydratedInputs.get(new TupleTag<>(localName))); + views.add( + ParDoTranslation.viewFromProto( + sideInputEntry.getValue(), + sideInputEntry.getKey(), + pCollection, + transformProto, + rehydratedComponents)); + } + additionalInputs = PCollectionViews.toAdditionalInputs(views); + } + + // TODO: CombineTranslator should own it - https://issues.apache.org/jira/browse/BEAM-2674 + List<Coder<?>> additionalCoders = Collections.emptyList(); + if (transformSpec.getUrn().equals(PTransformTranslation.COMBINE_TRANSFORM_URN)) { + RunnerApi.CombinePayload payload = + transformSpec.getParameter().unpack(RunnerApi.CombinePayload.class); + additionalCoders = + (List) + Collections.singletonList( + rehydratedComponents.getCoder(payload.getAccumulatorCoderId())); + } + + RehydratedPTransform transform = + RehydratedPTransform.of( + transformSpec.getUrn(), + transformSpec.getParameter(), + additionalInputs, + additionalCoders); + + if (isPrimitive(transformProto)) { + transforms.addFinalizedPrimitiveNode( + transformProto.getUniqueName(), rehydratedInputs, transform, rehydratedOutputs); + } else { + transforms.pushFinalizedNode( + transformProto.getUniqueName(), rehydratedInputs, transform, rehydratedOutputs); + + for (String childTransformId : transformProto.getSubtransformsList()) { + addRehydratedTransform( + transforms, + transformProtos.get(childTransformId), + pipeline, + transformProtos, + rehydratedComponents); + } + + transforms.popNode(); + } + } + + // A primitive transform is one with outputs that are not in its input and also + // not produced by a subtransform. + private static boolean isPrimitive(RunnerApi.PTransform transformProto) { + return transformProto.getSubtransformsCount() == 0 + && !transformProto + .getInputsMap() + .values() + .containsAll(transformProto.getOutputsMap().values()); + } + + @AutoValue + abstract static class RehydratedPTransform extends RawPTransform<PInput, POutput> { + + @Nullable + public abstract String getUrn(); + + @Nullable + public abstract Any getPayload(); + + @Override + public abstract Map<TupleTag<?>, PValue> getAdditionalInputs(); + + public abstract List<Coder<?>> getCoders(); + + public static RehydratedPTransform of( + String urn, + Any payload, + Map<TupleTag<?>, PValue> additionalInputs, + List<Coder<?>> additionalCoders) { + return new AutoValue_PipelineTranslation_RehydratedPTransform( + urn, payload, additionalInputs, additionalCoders); + } + + @Override + public POutput expand(PInput input) { + throw new IllegalStateException( + String.format( + "%s should never be asked to expand;" + + " it is the result of deserializing an already-constructed Pipeline", + getClass().getSimpleName())); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("urn", getUrn()) + .add("payload", getPayload()) + .toString(); + } + + @Override + public void registerComponents(SdkComponents components) { + for (Coder<?> coder : getCoders()) { + try { + components.registerCoder(coder); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/43481595/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RehydratedComponents.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RehydratedComponents.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RehydratedComponents.java index a9a34d7..ccdd4a7 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RehydratedComponents.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RehydratedComponents.java @@ -93,7 +93,8 @@ public class RehydratedComponents { PCollection.class.getSimpleName(), Pipeline.class.getSimpleName()); return PCollectionTranslation.fromProto( - components.getPcollectionsOrThrow(id), pipeline, RehydratedComponents.this); + components.getPcollectionsOrThrow(id), pipeline, RehydratedComponents.this) + .setName(id); } }); http://git-wip-us.apache.org/repos/asf/beam/blob/43481595/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java index 0d3ba60..54d2e9d 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java @@ -22,24 +22,16 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import com.google.common.base.Equivalence; -import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.BiMap; import com.google.common.collect.HashBiMap; -import com.google.common.collect.ListMultimap; import java.io.IOException; -import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; import java.util.List; import java.util.Set; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.Pipeline.PipelineVisitor; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.common.runner.v1.RunnerApi; import org.apache.beam.sdk.common.runner.v1.RunnerApi.Components; import org.apache.beam.sdk.runners.AppliedPTransform; -import org.apache.beam.sdk.runners.TransformHierarchy.Node; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.util.NameUtils; import org.apache.beam.sdk.values.PCollection; @@ -62,50 +54,6 @@ public class SdkComponents { return new SdkComponents(); } - public static RunnerApi.Pipeline translatePipeline(Pipeline pipeline) { - final SdkComponents components = create(); - final Collection<String> rootIds = new HashSet<>(); - pipeline.traverseTopologically( - new PipelineVisitor.Defaults() { - private final ListMultimap<Node, AppliedPTransform<?, ?, ?>> children = - ArrayListMultimap.create(); - - @Override - public void leaveCompositeTransform(Node node) { - if (node.isRootNode()) { - for (AppliedPTransform<?, ?, ?> pipelineRoot : children.get(node)) { - rootIds.add(components.getExistingPTransformId(pipelineRoot)); - } - } else { - children.put(node.getEnclosingNode(), node.toAppliedPTransform(getPipeline())); - try { - components.registerPTransform( - node.toAppliedPTransform(getPipeline()), children.get(node)); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - } - - @Override - public void visitPrimitiveTransform(Node node) { - children.put(node.getEnclosingNode(), node.toAppliedPTransform(getPipeline())); - try { - components.registerPTransform( - node.toAppliedPTransform(getPipeline()), - Collections.<AppliedPTransform<?, ?, ?>>emptyList()); - } catch (IOException e) { - throw new IllegalStateException(e); - } - } - }); - // TODO: Display Data - return RunnerApi.Pipeline.newBuilder() - .setComponents(components.toComponents()) - .addAllRootTransformIds(rootIds) - .build(); - } - private SdkComponents() { this.componentsBuilder = RunnerApi.Components.newBuilder(); this.transformIds = HashBiMap.create(); http://git-wip-us.apache.org/repos/asf/beam/blob/43481595/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PipelineTranslationTest.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PipelineTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PipelineTranslationTest.java new file mode 100644 index 0000000..9e6dff4 --- /dev/null +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PipelineTranslationTest.java @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.core.construction; + +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertThat; + +import com.google.common.base.Equivalence; +import com.google.common.collect.ImmutableList; +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.Pipeline.PipelineVisitor; +import org.apache.beam.sdk.coders.BigEndianLongCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.StructuredCoder; +import org.apache.beam.sdk.common.runner.v1.RunnerApi; +import org.apache.beam.sdk.io.GenerateSequence; +import org.apache.beam.sdk.runners.TransformHierarchy.Node; +import org.apache.beam.sdk.transforms.Count; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.View; +import org.apache.beam.sdk.transforms.WithKeys; +import org.apache.beam.sdk.transforms.windowing.AfterPane; +import org.apache.beam.sdk.transforms.windowing.AfterWatermark; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.PValue; +import org.apache.beam.sdk.values.WindowingStrategy; +import org.joda.time.Duration; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; + +/** Tests for {@link PipelineTranslation}. */ +@RunWith(Parameterized.class) +public class PipelineTranslationTest { + @Parameter(0) + public Pipeline pipeline; + + @Parameters(name = "{index}") + public static Iterable<Pipeline> testPipelines() { + Pipeline trivialPipeline = Pipeline.create(); + trivialPipeline.apply(Create.of(1, 2, 3)); + + Pipeline sideInputPipeline = Pipeline.create(); + final PCollectionView<String> singletonView = + sideInputPipeline.apply(Create.of("foo")).apply(View.<String>asSingleton()); + sideInputPipeline + .apply(Create.of("main input")) + .apply( + ParDo.of( + new DoFn<String, String>() { + @ProcessElement + public void process(ProcessContext c) { + // actually never executed and no effect on translation + c.sideInput(singletonView); + } + }) + .withSideInputs(singletonView)); + + Pipeline complexPipeline = Pipeline.create(); + BigEndianLongCoder customCoder = BigEndianLongCoder.of(); + PCollection<Long> elems = complexPipeline.apply(GenerateSequence.from(0L).to(207L)); + PCollection<Long> counted = elems.apply(Count.<Long>globally()).setCoder(customCoder); + PCollection<Long> windowed = + counted.apply( + Window.<Long>into(FixedWindows.of(Duration.standardMinutes(7))) + .triggering( + AfterWatermark.pastEndOfWindow() + .withEarlyFirings(AfterPane.elementCountAtLeast(19))) + .accumulatingFiredPanes() + .withAllowedLateness(Duration.standardMinutes(3L))); + final WindowingStrategy<?, ?> windowedStrategy = windowed.getWindowingStrategy(); + PCollection<KV<String, Long>> keyed = windowed.apply(WithKeys.<String, Long>of("foo")); + PCollection<KV<String, Iterable<Long>>> grouped = + keyed.apply(GroupByKey.<String, Long>create()); + + return ImmutableList.of(trivialPipeline, sideInputPipeline, complexPipeline); + } + + @Test + public void testProtoDirectly() { + final RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline); + pipeline.traverseTopologically( + new PipelineProtoVerificationVisitor(pipelineProto)); + } + + @Test + public void testProtoAgainstRehydrated() throws Exception { + RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline); + Pipeline rehydrated = PipelineTranslation.fromProto(pipelineProto); + + rehydrated.traverseTopologically( + new PipelineProtoVerificationVisitor(pipelineProto)); + } + + private static class PipelineProtoVerificationVisitor extends PipelineVisitor.Defaults { + + private final RunnerApi.Pipeline pipelineProto; + Set<Node> transforms; + Set<PCollection<?>> pcollections; + Set<Equivalence.Wrapper<? extends Coder<?>>> coders; + Set<WindowingStrategy<?, ?>> windowingStrategies; + + public PipelineProtoVerificationVisitor(RunnerApi.Pipeline pipelineProto) { + this.pipelineProto = pipelineProto; + transforms = new HashSet<>(); + pcollections = new HashSet<>(); + coders = new HashSet<>(); + windowingStrategies = new HashSet<>(); + } + + @Override + public void leaveCompositeTransform(Node node) { + if (node.isRootNode()) { + assertThat( + "Unexpected number of PTransforms", + pipelineProto.getComponents().getTransformsCount(), + equalTo(transforms.size())); + assertThat( + "Unexpected number of PCollections", + pipelineProto.getComponents().getPcollectionsCount(), + equalTo(pcollections.size())); + assertThat( + "Unexpected number of Coders", + pipelineProto.getComponents().getCodersCount(), + equalTo(coders.size())); + assertThat( + "Unexpected number of Windowing Strategies", + pipelineProto.getComponents().getWindowingStrategiesCount(), + equalTo(windowingStrategies.size())); + } else { + transforms.add(node); + if (PTransformTranslation.COMBINE_TRANSFORM_URN.equals( + PTransformTranslation.urnForTransformOrNull(node.getTransform()))) { + // Combine translation introduces a coder that is not assigned to any PCollection + // in the default expansion, and must be explicitly added here. + try { + addCoders( + CombineTranslation.getAccumulatorCoder(node.toAppliedPTransform(getPipeline()))); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + } + + @Override + public void visitPrimitiveTransform(Node node) { + transforms.add(node); + } + + @Override + public void visitValue(PValue value, Node producer) { + if (value instanceof PCollection) { + PCollection pc = (PCollection) value; + pcollections.add(pc); + addCoders(pc.getCoder()); + windowingStrategies.add(pc.getWindowingStrategy()); + addCoders(pc.getWindowingStrategy().getWindowFn().windowCoder()); + } + } + + private void addCoders(Coder<?> coder) { + coders.add(Equivalence.<Coder<?>>identity().wrap(coder)); + if (CoderTranslation.KNOWN_CODER_URNS.containsKey(coder.getClass())) { + for (Coder<?> component : ((StructuredCoder<?>) coder).getComponents()) { + addCoders(component); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/43481595/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java index ce6a99f..82840d6 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java @@ -24,43 +24,25 @@ import static org.hamcrest.Matchers.isEmptyOrNullString; import static org.hamcrest.Matchers.not; import static org.junit.Assert.assertThat; -import com.google.common.base.Equivalence; import java.io.IOException; import java.util.Collections; -import java.util.HashSet; -import java.util.Set; -import org.apache.beam.sdk.Pipeline.PipelineVisitor; -import org.apache.beam.sdk.coders.BigEndianLongCoder; import org.apache.beam.sdk.coders.ByteArrayCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.SetCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.coders.StructuredCoder; import org.apache.beam.sdk.coders.VarLongCoder; -import org.apache.beam.sdk.common.runner.v1.RunnerApi; import org.apache.beam.sdk.common.runner.v1.RunnerApi.Components; import org.apache.beam.sdk.io.GenerateSequence; import org.apache.beam.sdk.runners.AppliedPTransform; -import org.apache.beam.sdk.runners.TransformHierarchy.Node; import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.GroupByKey; -import org.apache.beam.sdk.transforms.WithKeys; -import org.apache.beam.sdk.transforms.windowing.AfterPane; -import org.apache.beam.sdk.transforms.windowing.AfterWatermark; -import org.apache.beam.sdk.transforms.windowing.FixedWindows; -import org.apache.beam.sdk.transforms.windowing.Window; -import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PValue; import org.apache.beam.sdk.values.WindowingStrategy; import org.apache.beam.sdk.values.WindowingStrategy.AccumulationMode; import org.hamcrest.Matchers; -import org.joda.time.Duration; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -78,95 +60,6 @@ public class SdkComponentsTest { private SdkComponents components = SdkComponents.create(); @Test - public void translatePipeline() { - BigEndianLongCoder customCoder = BigEndianLongCoder.of(); - PCollection<Long> elems = pipeline.apply(GenerateSequence.from(0L).to(207L)); - PCollection<Long> counted = elems.apply(Count.<Long>globally()).setCoder(customCoder); - PCollection<Long> windowed = - counted.apply( - Window.<Long>into(FixedWindows.of(Duration.standardMinutes(7))) - .triggering( - AfterWatermark.pastEndOfWindow() - .withEarlyFirings(AfterPane.elementCountAtLeast(19))) - .accumulatingFiredPanes() - .withAllowedLateness(Duration.standardMinutes(3L))); - final WindowingStrategy<?, ?> windowedStrategy = windowed.getWindowingStrategy(); - PCollection<KV<String, Long>> keyed = windowed.apply(WithKeys.<String, Long>of("foo")); - PCollection<KV<String, Iterable<Long>>> grouped = - keyed.apply(GroupByKey.<String, Long>create()); - - final RunnerApi.Pipeline pipelineProto = SdkComponents.translatePipeline(pipeline); - pipeline.traverseTopologically( - new PipelineVisitor.Defaults() { - Set<Node> transforms = new HashSet<>(); - Set<PCollection<?>> pcollections = new HashSet<>(); - Set<Equivalence.Wrapper<? extends Coder<?>>> coders = new HashSet<>(); - Set<WindowingStrategy<?, ?>> windowingStrategies = new HashSet<>(); - - @Override - public void leaveCompositeTransform(Node node) { - if (node.isRootNode()) { - assertThat( - "Unexpected number of PTransforms", - pipelineProto.getComponents().getTransformsCount(), - equalTo(transforms.size())); - assertThat( - "Unexpected number of PCollections", - pipelineProto.getComponents().getPcollectionsCount(), - equalTo(pcollections.size())); - assertThat( - "Unexpected number of Coders", - pipelineProto.getComponents().getCodersCount(), - equalTo(coders.size())); - assertThat( - "Unexpected number of Windowing Strategies", - pipelineProto.getComponents().getWindowingStrategiesCount(), - equalTo(windowingStrategies.size())); - } else { - transforms.add(node); - if (PTransformTranslation.COMBINE_TRANSFORM_URN.equals( - PTransformTranslation.urnForTransformOrNull(node.getTransform()))) { - // Combine translation introduces a coder that is not assigned to any PCollection - // in the default expansion, and must be explicitly added here. - try { - addCoders( - CombineTranslation.getAccumulatorCoder( - node.toAppliedPTransform(getPipeline()))); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - } - } - - @Override - public void visitPrimitiveTransform(Node node) { - transforms.add(node); - } - - @Override - public void visitValue(PValue value, Node producer) { - if (value instanceof PCollection) { - PCollection pc = (PCollection) value; - pcollections.add(pc); - addCoders(pc.getCoder()); - windowingStrategies.add(pc.getWindowingStrategy()); - addCoders(pc.getWindowingStrategy().getWindowFn().windowCoder()); - } - } - - private void addCoders(Coder<?> coder) { - coders.add(Equivalence.<Coder<?>>identity().wrap(coder)); - if (coder instanceof StructuredCoder) { - for (Coder<?> component : ((StructuredCoder<?>) coder).getComponents()) { - addCoders(component); - } - } - } - }); - } - - @Test public void registerCoder() throws IOException { Coder<?> coder = KvCoder.of(StringUtf8Coder.of(), IterableCoder.of(SetCoder.of(ByteArrayCoder.of()))); http://git-wip-us.apache.org/repos/asf/beam/blob/43481595/sdks/common/runner-api/src/main/proto/beam_runner_api.proto ---------------------------------------------------------------------- diff --git a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto index 711da2a..0c433fa 100644 --- a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto +++ b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto @@ -92,7 +92,9 @@ message Pipeline { // this pipeline. Components components = 1; - // (Required) The ids of all PTransforms that are not contained within another PTransform + // (Required) The ids of all PTransforms that are not contained within another PTransform. + // These must be in shallow topological order, so that traversing them recursively + // in this order yields a recursively topological traversal. repeated string root_transform_ids = 2; // (Optional) Static display data for the pipeline. If there is none, http://git-wip-us.apache.org/repos/asf/beam/blob/43481595/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java index bdf8a12..760efb3 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java @@ -180,6 +180,12 @@ public class Pipeline { return begin().apply(name, root); } + @Internal + public static Pipeline forTransformHierarchy( + TransformHierarchy transforms, PipelineOptions options) { + return new Pipeline(transforms, options); + } + /** * <b><i>For internal use only; no backwards-compatibility guarantees.</i></b> * @@ -476,16 +482,21 @@ public class Pipeline { ///////////////////////////////////////////////////////////////////////////// // Below here are internal operations, never called by users. - private final TransformHierarchy transforms = new TransformHierarchy(); + private final TransformHierarchy transforms; private Set<String> usedFullNames = new HashSet<>(); private CoderRegistry coderRegistry; private final List<String> unstableNames = new ArrayList<>(); private final PipelineOptions defaultOptions; - protected Pipeline(PipelineOptions options) { + private Pipeline(TransformHierarchy transforms, PipelineOptions options) { + this.transforms = transforms; this.defaultOptions = options; } + protected Pipeline(PipelineOptions options) { + this(new TransformHierarchy(), options); + } + @Override public String toString() { return "Pipeline#" + hashCode(); http://git-wip-us.apache.org/repos/asf/beam/blob/43481595/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java index d8ff59e..c2d5771 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java @@ -34,6 +34,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import javax.annotation.Nullable; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.Pipeline.PipelineVisitor; import org.apache.beam.sdk.Pipeline.PipelineVisitor.CompositeBehavior; @@ -98,6 +99,48 @@ public class TransformHierarchy { return current; } + @Internal + public Node pushFinalizedNode( + String name, + Map<TupleTag<?>, PValue> inputs, + PTransform<?, ?> transform, + Map<TupleTag<?>, PValue> outputs) { + checkNotNull( + transform, "A %s must be provided for all Nodes", PTransform.class.getSimpleName()); + checkNotNull( + name, "A name must be provided for all %s Nodes", PTransform.class.getSimpleName()); + checkNotNull( + inputs, "An input must be provided for all %s Nodes", PTransform.class.getSimpleName()); + Node node = new Node(current, transform, name, inputs, outputs); + node.finishedSpecifying = true; + current.addComposite(node); + current = node; + return current; + } + + @Internal + public Node addFinalizedPrimitiveNode( + String name, + Map<TupleTag<?>, PValue> inputs, + PTransform<?, ?> transform, + Map<TupleTag<?>, PValue> outputs) { + checkNotNull( + transform, "A %s must be provided for all Nodes", PTransform.class.getSimpleName()); + checkNotNull( + name, "A name must be provided for all %s Nodes", PTransform.class.getSimpleName()); + checkNotNull( + inputs, "Inputs must be provided for all %s Nodes", PTransform.class.getSimpleName()); + checkNotNull( + outputs, "Outputs must be provided for all %s Nodes", PTransform.class.getSimpleName()); + Node node = new Node(current, transform, name, inputs, outputs); + node.finishedSpecifying = true; + for (PValue output : outputs.values()) { + producers.put(output, node); + } + current.addComposite(node); + return node; + } + public Node replaceNode(Node existing, PInput input, PTransform<?, ?> transform) { checkNotNull(existing); checkNotNull(input); @@ -321,6 +364,32 @@ public class TransformHierarchy { } /** + * Creates a new {@link Node} with the given parent and transform, where inputs and outputs + * are already known. + * + * <p>EnclosingNode and transform may both be null for a root-level node, which holds all other + * nodes. + * + * @param enclosingNode the composite node containing this node + * @param transform the PTransform tracked by this node + * @param fullName the fully qualified name of the transform + * @param inputs the expanded inputs to the transform + * @param outputs the expanded outputs of the transform + */ + private Node( + @Nullable Node enclosingNode, + @Nullable PTransform<?, ?> transform, + String fullName, + @Nullable Map<TupleTag<?>, PValue> inputs, + @Nullable Map<TupleTag<?>, PValue> outputs) { + this.enclosingNode = enclosingNode; + this.transform = transform; + this.fullName = fullName; + this.inputs = inputs == null ? Collections.<TupleTag<?>, PValue>emptyMap() : inputs; + this.outputs = outputs == null ? Collections.<TupleTag<?>, PValue>emptyMap() : outputs; + } + + /** * Returns the transform associated with this transform node. */ public PTransform<?, ?> getTransform() {