[ https://issues.apache.org/jira/browse/BEAM-3565?focusedWorklogId=82057&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82057 ]
ASF GitHub Bot logged work on BEAM-3565: ---------------------------------------- Author: ASF GitHub Bot Created on: 19/Mar/18 21:32 Start Date: 19/Mar/18 21:32 Worklog Time Spent: 10m Work Description: tgroh closed pull request #4844: [BEAM-3565] Add ExecutableStagePayload to simplify runner stage reconstruction URL: https://github.com/apache/beam/pull/4844 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/model/pipeline/src/main/proto/beam_runner_api.proto b/model/pipeline/src/main/proto/beam_runner_api.proto index b45be09efb6..3ed90dd036e 100644 --- a/model/pipeline/src/main/proto/beam_runner_api.proto +++ b/model/pipeline/src/main/proto/beam_runner_api.proto @@ -203,6 +203,23 @@ message PCollection { DisplayData display_data = 5; } +// The payload for an executable stage. This will eventually be passed to an SDK in the form of a +// ProcessBundleDescriptor. +message ExecutableStagePayload { + + Environment environment = 1; + + // Input PCollection id. + string input = 2; + + // PTransform ids contained within this executable stage. + repeated string transforms = 3; + + // Output PCollection ids. + repeated string outputs = 4; + +} + // The payload for the primitive ParDo transform. message ParDoPayload { diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ExecutableStageTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ExecutableStageTranslation.java new file mode 100644 index 00000000000..1200dc621a7 --- /dev/null +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ExecutableStageTranslation.java @@ -0,0 +1,25 @@ +package org.apache.beam.runners.core.construction; + +import static com.google.common.base.Preconditions.checkArgument; + +import java.io.IOException; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload; +import org.apache.beam.runners.core.construction.graph.ExecutableStage; +import org.apache.beam.sdk.runners.AppliedPTransform; + +/** + * Utilities for converting {@link ExecutableStage}s to and from {@link RunnerApi} protocol buffers. + */ +public class ExecutableStageTranslation { + + /** Extracts an {@link ExecutableStagePayload} from the given transform. */ + public static ExecutableStagePayload getExecutableStagePayload( + AppliedPTransform<?, ?, ?> appliedTransform) throws IOException { + RunnerApi.PTransform transform = + PTransformTranslation.toProto(appliedTransform, SdkComponents.create()); + checkArgument(ExecutableStage.URN.equals(transform.getSpec().getUrn())); + return ExecutableStagePayload.parseFrom(transform.getSpec().getPayload()); + } + +} diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ExecutableStage.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ExecutableStage.java index 766ce0d7136..27bfed87553 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ExecutableStage.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ExecutableStage.java @@ -18,19 +18,16 @@ package org.apache.beam.runners.core.construction.graph; -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.collect.Iterables.getOnlyElement; - import java.util.Collection; -import java.util.Optional; +import java.util.List; import java.util.stream.Collectors; import org.apache.beam.model.pipeline.v1.RunnerApi.Components; import org.apache.beam.model.pipeline.v1.RunnerApi.Environment; +import org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload; import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec; import org.apache.beam.model.pipeline.v1.RunnerApi.PCollection; import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform; import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline; -import org.apache.beam.runners.core.construction.Environments; import org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode; import org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode; @@ -84,64 +81,72 @@ * follows: * * <ul> - * <li>The {@link PTransform#getSubtransformsList()} contains the result of {@link - * #getTransforms()}. + * <li>The {@link PTransform#getSubtransformsList()} contains no subtransforms. This ensures + * that executable stages are treated as primitive transforms. * <li>The only {@link PCollection} in the {@link PTransform#getInputsMap()} is the result of * {@link #getInputPCollection()}. * <li>The output {@link PCollection PCollections} in the values of {@link * PTransform#getOutputsMap()} are the {@link PCollectionNode PCollections} returned by * {@link #getOutputPCollections()}. + * <li>The {@link FunctionSpec} contains an {@link ExecutableStagePayload} which has its input + * and output PCollections set to the same values as the outer PTransform itself. It further + * contains the environment set of transforms for this stage. * </ul> + * + * <p>The executable stage can be reconstructed from the resulting {@link ExecutableStagePayload} + * and components alone via {@link #fromPayload(ExecutableStagePayload, Components)}. */ default PTransform toPTransform() { + ExecutableStagePayload.Builder payload = ExecutableStagePayload.newBuilder(); + + payload.setEnvironment(getEnvironment()); + + PCollectionNode input = getInputPCollection(); + payload.setInput(input.getId()); + + for (PTransformNode transform : getTransforms()) { + payload.addTransforms(transform.getId()); + } + + for (PCollectionNode output : getOutputPCollections()) { + payload.addOutputs(output.getId()); + } + PTransform.Builder pt = PTransform.newBuilder(); + pt.setSpec(FunctionSpec.newBuilder() + .setUrn(ExecutableStage.URN) + .setPayload(payload.build().toByteString()) + .build()); pt.putInputs("input", getInputPCollection().getId()); - int i = 0; - for (PCollectionNode materializedPCollection : getOutputPCollections()) { - pt.putOutputs(String.format("materialized_%s", i), materializedPCollection.getId()); - i++; - } - for (PTransformNode fusedTransform : getTransforms()) { - pt.addSubtransforms(fusedTransform.getId()); + int outputIndex = 0; + for (PCollectionNode pcNode : getOutputPCollections()) { + // Do something + pt.putOutputs(String.format("materialized_%d", outputIndex), pcNode.getId()); + outputIndex++; } - pt.setSpec(FunctionSpec.newBuilder().setUrn(ExecutableStage.URN)); return pt.build(); } + // TODO: Should this live under ExecutableStageTranslation? /** - * Return an {@link ExecutableStage} constructed from the provided {@link PTransform} + * Return an {@link ExecutableStage} constructed from the provided {@link FunctionSpec} * representation. * - * <p>See {@link #toPTransform()} for information about the required format of the {@link - * PTransform}. The environment will be determined by an arbitrary {@link PTransform} contained - * within the {@link PTransform#getSubtransformsList()}. + * <p>See {@link #toPTransform()} for how the payload is constructed. Note that the payload + * contains some information redundant with the {@link PTransform} due to runner implementations + * not having the full transform context at translation time, but rather access to an + * {@link org.apache.beam.sdk.runners.AppliedPTransform}. */ - static ExecutableStage fromPTransform(PTransform ptransform, Components components) { - checkArgument(ptransform.getSpec().getUrn().equals(URN)); - // It may be better to put this in an explicit Payload if other metadata becomes required - Optional<Environment> environment = - Environments.getEnvironment(ptransform.getSubtransforms(0), components); - checkArgument( - environment.isPresent(), - "%s with no %s", - ExecutableStage.class.getSimpleName(), - Environment.class.getSimpleName()); - String inputId = getOnlyElement(ptransform.getInputsMap().values()); - PCollectionNode inputNode = - PipelineNode.pCollection(inputId, components.getPcollectionsOrThrow(inputId)); - Collection<PCollectionNode> outputNodes = - ptransform - .getOutputsMap() - .values() - .stream() - .map(id -> PipelineNode.pCollection(id, components.getPcollectionsOrThrow(id))) - .collect(Collectors.toSet()); - Collection<PTransformNode> transformNodes = - ptransform - .getSubtransformsList() - .stream() - .map(id -> PipelineNode.pTransform(id, components.getTransformsOrThrow(id))) - .collect(Collectors.toSet()); - return ImmutableExecutableStage.of(environment.get(), inputNode, transformNodes, outputNodes); + static ExecutableStage fromPayload(ExecutableStagePayload payload, Components components) { + Environment environment = payload.getEnvironment(); + PCollectionNode input = PipelineNode.pCollection(payload.getInput(), + components.getPcollectionsOrThrow(payload.getInput())); + List<PTransformNode> transforms = payload.getTransformsList().stream() + .map(id -> PipelineNode.pTransform(id, components.getTransformsOrThrow(id))) + .collect(Collectors.toList()); + List<PCollectionNode> outputs = payload.getOutputsList().stream() + .map(id -> PipelineNode.pCollection(id, components.getPcollectionsOrThrow(id))) + .collect(Collectors.toList()); + return ImmutableExecutableStage.of(environment, input, transforms, outputs); } } diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/ExecutableStageTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/ExecutableStageTest.java index e13eb6f80cf..5bbefc56c8a 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/ExecutableStageTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/ExecutableStageTest.java @@ -29,6 +29,7 @@ import java.util.Collections; import org.apache.beam.model.pipeline.v1.RunnerApi.Components; import org.apache.beam.model.pipeline.v1.RunnerApi.Environment; +import org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload; import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec; import org.apache.beam.model.pipeline.v1.RunnerApi.PCollection; import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform; @@ -46,7 +47,7 @@ @RunWith(JUnit4.class) public class ExecutableStageTest { @Test - public void testRoundTripToFromTransform() { + public void testRoundTripToFromTransform() throws Exception { Environment env = Environment.newBuilder().setUrl("foo").build(); PTransform pt = PTransform.newBuilder() @@ -84,13 +85,15 @@ public void testRoundTripToFromTransform() { assertThat(stagePTransform.getOutputsCount(), equalTo(1)); assertThat(stagePTransform.getInputsMap(), hasValue("input.out")); assertThat(stagePTransform.getInputsCount(), equalTo(1)); - assertThat(stagePTransform.getSubtransformsList(), contains("pt")); - assertThat(ExecutableStage.fromPTransform(stagePTransform, components), equalTo(stage)); + ExecutableStagePayload payload = ExecutableStagePayload.parseFrom( + stagePTransform.getSpec().getPayload()); + assertThat(payload.getTransformsList(), contains("pt")); + assertThat(ExecutableStage.fromPayload(payload, components), equalTo(stage)); } @Test - public void testRoundTripToFromTransformFused() { + public void testRoundTripToFromTransformFused() throws Exception { PTransform parDoTransform = PTransform.newBuilder() .putInputs("input", "impulse.out") @@ -148,9 +151,11 @@ public void testRoundTripToFromTransformFused() { assertThat(ptransform.getSpec().getUrn(), equalTo(ExecutableStage.URN)); assertThat(ptransform.getInputsMap().values(), containsInAnyOrder("impulse.out")); assertThat(ptransform.getOutputsMap().values(), emptyIterable()); - assertThat(ptransform.getSubtransformsList(), contains("parDo", "window")); - ExecutableStage desered = ExecutableStage.fromPTransform(ptransform, components); + ExecutableStagePayload payload = ExecutableStagePayload.parseFrom( + ptransform.getSpec().getPayload()); + assertThat(payload.getTransformsList(), contains("parDo", "window")); + ExecutableStage desered = ExecutableStage.fromPayload(payload, components); assertThat(desered, equalTo(subgraph)); } } diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyStageFuserTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyStageFuserTest.java index 9f3355f4212..09c37c6450b 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyStageFuserTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyStageFuserTest.java @@ -20,7 +20,6 @@ import static com.google.common.collect.Iterables.getOnlyElement; import static org.hamcrest.Matchers.contains; -import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.emptyIterable; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasItem; @@ -30,6 +29,7 @@ import com.google.common.collect.ImmutableSet; import java.util.Collections; import java.util.Set; +import java.util.stream.Collectors; import org.apache.beam.model.pipeline.v1.RunnerApi.Components; import org.apache.beam.model.pipeline.v1.RunnerApi.Environment; import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec; @@ -44,6 +44,8 @@ import org.apache.beam.runners.core.construction.PTransformTranslation; import org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode; import org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode; +import org.hamcrest.Description; +import org.hamcrest.TypeSafeMatcher; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -239,8 +241,7 @@ public void fusesCompatibleEnvironments() { PipelineNode.pTransform("window", windowTransform))); // Nothing consumes the outputs of ParDo or Window, so they don't have to be materialized assertThat(subgraph.getOutputPCollections(), emptyIterable()); - assertThat( - subgraph.toPTransform().getSubtransformsList(), containsInAnyOrder("parDo", "window")); + assertThat(subgraph, hasSubtransforms("parDo", "window")); } @Test @@ -299,8 +300,7 @@ public void materializesWithStatefulConsumer() { contains( PipelineNode.pCollection( "parDo.out", PCollection.newBuilder().setUniqueName("parDo.out").build()))); - assertThat( - subgraph.toPTransform().getSubtransformsList(), containsInAnyOrder("parDo")); + assertThat(subgraph, hasSubtransforms("parDo")); } @Test @@ -359,8 +359,7 @@ public void materializesWithConsumerWithTimer() { contains( PipelineNode.pCollection( "parDo.out", PCollection.newBuilder().setUniqueName("parDo.out").build()))); - assertThat( - subgraph.toPTransform().getSubtransformsList(), containsInAnyOrder("parDo")); + assertThat(subgraph, hasSubtransforms("parDo")); } @Test @@ -440,9 +439,7 @@ public void fusesFlatten() { GreedyStageFuser.forGrpcPortRead( p, impulseOutputNode, p.getPerElementConsumers(impulseOutputNode)); assertThat(subgraph.getOutputPCollections(), emptyIterable()); - assertThat( - subgraph.toPTransform().getSubtransformsList(), - containsInAnyOrder("read", "parDo", "flatten", "window")); + assertThat(subgraph, hasSubtransforms("read", "parDo", "flatten", "window")); } @Test @@ -524,9 +521,7 @@ public void fusesFlattenWithDifferentEnvironmentInputs() { GreedyStageFuser.forGrpcPortRead( p, impulseOutputNode, ImmutableSet.of(PipelineNode.pTransform("read", readTransform))); assertThat(subgraph.getOutputPCollections(), emptyIterable()); - assertThat( - subgraph.toPTransform().getSubtransformsList(), - containsInAnyOrder("read", "flatten", "window")); + assertThat(subgraph, hasSubtransforms("read", "flatten", "window")); // Flatten shows up in both of these subgraphs, but elements only go through a path to the // flatten once. @@ -540,9 +535,7 @@ public void fusesFlattenWithDifferentEnvironmentInputs() { contains( PipelineNode.pCollection( "flatten.out", components.getPcollectionsOrThrow("flatten.out")))); - assertThat( - readFromOtherEnv.toPTransform().getSubtransformsList(), - containsInAnyOrder("envRead", "flatten")); + assertThat(readFromOtherEnv, hasSubtransforms("envRead", "flatten")); } @Test @@ -892,7 +885,7 @@ public void materializesWithSideInputConsumer() { GreedyStageFuser.forGrpcPortRead( p, impulseOutputNode, ImmutableSet.of(readNode)); assertThat(subgraph.getOutputPCollections(), contains(readOutput)); - assertThat(subgraph.toPTransform().getSubtransformsList(), contains(readNode.getId())); + assertThat(subgraph, hasSubtransforms(readNode.getId())); } @Test @@ -943,6 +936,28 @@ public void materializesWithGroupByKeyConsumer() { GreedyStageFuser.forGrpcPortRead( p, impulseOutputNode, ImmutableSet.of(readNode)); assertThat(subgraph.getOutputPCollections(), contains(readOutput)); - assertThat(subgraph.toPTransform().getSubtransformsList(), contains(readNode.getId())); + assertThat(subgraph, hasSubtransforms(readNode.getId())); + } + + private static TypeSafeMatcher<ExecutableStage> hasSubtransforms(String id, String... ids) { + Set<String> expectedTransforms = ImmutableSet.<String>builder().add(id).add(ids).build(); + return new TypeSafeMatcher<ExecutableStage>() { + @Override + protected boolean matchesSafely(ExecutableStage executableStage) { + // NOTE: Transform names must be unique, so it's fine to throw here if this does not hold. + Set<String> stageTransforms = executableStage.getTransforms().stream() + .map(PTransformNode::getId) + .collect(Collectors.toSet()); + return stageTransforms.containsAll(expectedTransforms) + && expectedTransforms.containsAll(stageTransforms); + } + + @Override + public void describeTo(Description description) { + description.appendText( + "ExecutableStage with subtransform ids: " + expectedTransforms); + } + }; } + } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 82057) Time Spent: 13h 10m (was: 13h) > Add utilities for producing a collection of PTransforms that can execute in a > single SDK Harness > ------------------------------------------------------------------------------------------------ > > Key: BEAM-3565 > URL: https://issues.apache.org/jira/browse/BEAM-3565 > Project: Beam > Issue Type: Bug > Components: runner-core > Reporter: Thomas Groh > Assignee: Thomas Groh > Priority: Major > Labels: portability > Fix For: 2.4.0 > > Time Spent: 13h 10m > Remaining Estimate: 0h > > An SDK Harness executes some ("fused") collection of PTransforms. The java > runner libraries should provide some way to take a Pipeline that executes in > both a runner and an environment and construct a collection of transforms > which can execute within a single environment. -- This message was sent by Atlassian JIRA (v7.6.3#76005)