This is an automated email from the ASF dual-hosted git repository. lcwik pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 342fcdb [BEAM-9340] Validate pipeline requirements in PipelineValidator. new e8cb91f Merge pull request #11224 from robertwb/java-validate-requirements 342fcdb is described below commit 342fcdb8b0fbaae8de286968853abd34e7d862ff Author: Robert Bradshaw <rober...@gmail.com> AuthorDate: Wed Mar 25 11:34:29 2020 -0700 [BEAM-9340] Validate pipeline requirements in PipelineValidator. Also fix a bug where the greedy fuser was not propagating requirements. --- .../core/construction/graph/FusedPipeline.java | 9 +++- .../construction/graph/GreedyPipelineFuser.java | 13 ++++-- .../core/construction/graph/PipelineValidator.java | 51 ++++++++++++++++------ .../graph/GreedyPipelineFuserTest.java | 19 ++++++-- 4 files changed, 71 insertions(+), 21 deletions(-) diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/FusedPipeline.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/FusedPipeline.java index a67c36d..f9af790 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/FusedPipeline.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/FusedPipeline.java @@ -38,8 +38,9 @@ public abstract class FusedPipeline { static FusedPipeline of( Components components, Set<ExecutableStage> environmentalStages, - Set<PTransformNode> runnerStages) { - return new AutoValue_FusedPipeline(components, environmentalStages, runnerStages); + Set<PTransformNode> runnerStages, + Set<String> requirements) { + return new AutoValue_FusedPipeline(components, environmentalStages, runnerStages, requirements); } abstract Components getComponents(); @@ -50,6 +51,9 @@ public abstract class FusedPipeline { /** The {@link PTransform PTransforms} that a runner is responsible for executing. */ public abstract Set<PTransformNode> getRunnerExecutedTransforms(); + /** The {@link PTransform PTransforms} that a runner is responsible for executing. */ + public abstract Set<String> getRequirements(); + /** * Returns the {@link RunnerApi.Pipeline} representation of this {@link FusedPipeline}. * @@ -84,6 +88,7 @@ public abstract class FusedPipeline { Pipeline.newBuilder() .setComponents(fusedComponents) .addAllRootTransformIds(rootTransformIds) + .addAllRequirements(getRequirements()) .build(); // Validate that fusion didn't produce a malformed pipeline. PipelineValidator.validate(res); diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPipelineFuser.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPipelineFuser.java index 6184498..dbfe6c6 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPipelineFuser.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPipelineFuser.java @@ -46,6 +46,7 @@ import org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNo import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ComparisonChain; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.HashMultimap; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Multimap; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets; import org.slf4j.Logger; @@ -72,7 +73,11 @@ public class GreedyPipelineFuser { unfusedRootNodes.addAll(descendants.getUnfusedNodes()); rootConsumers.addAll(descendants.getFusibleConsumers()); } - this.fusedPipeline = fusePipeline(unfusedRootNodes, groupSiblings(rootConsumers)); + this.fusedPipeline = + fusePipeline( + unfusedRootNodes, + groupSiblings(rootConsumers), + ImmutableSet.copyOf(p.getRequirementsList())); } /** @@ -114,7 +119,8 @@ public class GreedyPipelineFuser { */ private FusedPipeline fusePipeline( Collection<PTransformNode> initialUnfusedTransforms, - NavigableSet<NavigableSet<CollectionConsumer>> initialConsumers) { + NavigableSet<NavigableSet<CollectionConsumer>> initialConsumers, + Set<String> requirements) { Map<CollectionConsumer, ExecutableStage> consumedCollectionsAndTransforms = new HashMap<>(); Set<ExecutableStage> stages = new LinkedHashSet<>(); Set<PTransformNode> unfusedTransforms = new LinkedHashSet<>(initialUnfusedTransforms); @@ -174,7 +180,8 @@ public class GreedyPipelineFuser { deduplicated .getDeduplicatedTransforms() .getOrDefault(transform.getId(), transform)) - .collect(Collectors.toSet()))); + .collect(Collectors.toSet())), + requirements); } private DescendantConsumers getRootConsumers(PTransformNode rootNode) { diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/PipelineValidator.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/PipelineValidator.java index c0ffb5c..c80947b 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/PipelineValidator.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/PipelineValidator.java @@ -20,6 +20,7 @@ package org.apache.beam.runners.core.construction.graph; import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; import java.util.Map; +import java.util.Set; import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.model.pipeline.v1.RunnerApi.CombinePayload; import org.apache.beam.model.pipeline.v1.RunnerApi.Components; @@ -31,7 +32,9 @@ import org.apache.beam.model.pipeline.v1.RunnerApi.TestStreamPayload; import org.apache.beam.model.pipeline.v1.RunnerApi.WindowIntoPayload; import org.apache.beam.model.pipeline.v1.RunnerApi.WindowingStrategy; import org.apache.beam.runners.core.construction.PTransformTranslation; +import org.apache.beam.runners.core.construction.ParDoTranslation; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps; /** @@ -42,7 +45,9 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps; public class PipelineValidator { @FunctionalInterface private interface TransformValidator { - void validate(String transformId, PTransform transform, Components components) throws Exception; + void validate( + String transformId, PTransform transform, Components components, Set<String> requirements) + throws Exception; } private static final ImmutableMap<String, TransformValidator> VALIDATORS = @@ -95,10 +100,11 @@ public class PipelineValidator { transformId); } - validateComponents("pipeline", components); + validateComponents("pipeline", components, ImmutableSet.copyOf(p.getRequirementsList())); } - private static void validateComponents(String context, Components components) { + private static void validateComponents( + String context, Components components, Set<String> requirements) { { Map<String, String> uniqueNamesById = Maps.newHashMap(); for (String transformId : components.getTransformsMap().keySet()) { @@ -114,7 +120,7 @@ public class PipelineValidator { transformId, previousId, transform.getUniqueName()); - validateTransform(transformId, transform, components); + validateTransform(transformId, transform, components, requirements); } } { @@ -172,7 +178,8 @@ public class PipelineValidator { } } - private static void validateTransform(String id, PTransform transform, Components components) { + private static void validateTransform( + String id, PTransform transform, Components components, Set<String> requirements) { for (String subtransformId : transform.getSubtransformsList()) { checkArgument( components.containsTransforms(subtransformId), @@ -203,14 +210,15 @@ public class PipelineValidator { String urn = transform.getSpec().getUrn(); if (VALIDATORS.containsKey(urn)) { try { - VALIDATORS.get(urn).validate(id, transform, components); + VALIDATORS.get(urn).validate(id, transform, components, requirements); } catch (Exception e) { throw new RuntimeException(String.format("Failed to validate transform %s", id), e); } } } - private static void validateParDo(String id, PTransform transform, Components components) + private static void validateParDo( + String id, PTransform transform, Components components, Set<String> requirements) throws Exception { ParDoPayload payload = ParDoPayload.parseFrom(transform.getSpec().getPayload()); // side_inputs @@ -221,23 +229,39 @@ public class PipelineValidator { id, sideInputId); } - // TODO: Validate state_specs and timer_specs + if (payload.getStateSpecsCount() > 0 || payload.getTimerSpecsCount() > 0) { + checkArgument(requirements.contains(ParDoTranslation.REQUIRES_STATEFUL_PROCESSING_URN)); + // TODO: Validate state_specs and timer_specs + } if (!payload.getRestrictionCoderId().isEmpty()) { checkArgument(components.containsCoders(payload.getRestrictionCoderId())); + checkArgument(requirements.contains(ParDoTranslation.REQUIRES_SPLITTABLE_DOFN_URN)); + } + if (payload.getRequestsFinalization()) { + checkArgument(requirements.contains(ParDoTranslation.REQUIRES_BUNDLE_FINALIZATION_URN)); + } + if (payload.getRequiresStableInput()) { + checkArgument(requirements.contains(ParDoTranslation.REQUIRES_STABLE_INPUT_URN)); + } + if (payload.getRequiresTimeSortedInput()) { + checkArgument(requirements.contains(ParDoTranslation.REQUIRES_TIME_SORTED_INPUT_URN)); } } - private static void validateAssignWindows(String id, PTransform transform, Components components) + private static void validateAssignWindows( + String id, PTransform transform, Components components, Set<String> requirements) throws Exception { WindowIntoPayload.parseFrom(transform.getSpec().getPayload()); } - private static void validateTestStream(String id, PTransform transform, Components components) + private static void validateTestStream( + String id, PTransform transform, Components components, Set<String> requirements) throws Exception { TestStreamPayload.parseFrom(transform.getSpec().getPayload()); } - private static void validateCombine(String id, PTransform transform, Components components) + private static void validateCombine( + String id, PTransform transform, Components components, Set<String> requirements) throws Exception { CombinePayload payload = CombinePayload.parseFrom(transform.getSpec().getPayload()); checkArgument( @@ -247,7 +271,8 @@ public class PipelineValidator { } private static void validateExecutableStage( - String id, PTransform transform, Components outerComponents) throws Exception { + String id, PTransform transform, Components outerComponents, Set<String> requirements) + throws Exception { ExecutableStagePayload payload = ExecutableStagePayload.parseFrom(transform.getSpec().getPayload()); @@ -278,7 +303,7 @@ public class PipelineValidator { outputId); } - validateComponents("ExecutableStage " + id, components); + validateComponents("ExecutableStage " + id, components, requirements); // TODO: Also validate that side inputs of all transforms within components.getTransforms() // are contained within payload.getSideInputsList() diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyPipelineFuserTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyPipelineFuserTest.java index 0a32f9e..97b8091 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyPipelineFuserTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyPipelineFuserTest.java @@ -48,6 +48,7 @@ import org.apache.beam.model.pipeline.v1.RunnerApi.WindowIntoPayload; import org.apache.beam.model.pipeline.v1.RunnerApi.WindowingStrategy; import org.apache.beam.runners.core.construction.Environments; import org.apache.beam.runners.core.construction.PTransformTranslation; +import org.apache.beam.runners.core.construction.ParDoTranslation; import org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode; import org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode; import org.hamcrest.Matchers; @@ -980,7 +981,11 @@ public class GreedyPipelineFuserTest { .putEnvironments("common", Environments.createDockerEnvironment("common")) .build(); FusedPipeline fused = - GreedyPipelineFuser.fuse(Pipeline.newBuilder().setComponents(components).build()); + GreedyPipelineFuser.fuse( + Pipeline.newBuilder() + .setComponents(components) + .addRequirements(ParDoTranslation.REQUIRES_STATEFUL_PROCESSING_URN) + .build()); assertThat( fused.getRunnerExecutedTransforms(), @@ -1054,7 +1059,11 @@ public class GreedyPipelineFuserTest { .build(); FusedPipeline fused = - GreedyPipelineFuser.fuse(Pipeline.newBuilder().setComponents(components).build()); + GreedyPipelineFuser.fuse( + Pipeline.newBuilder() + .setComponents(components) + .addRequirements(ParDoTranslation.REQUIRES_STATEFUL_PROCESSING_URN) + .build()); assertThat( fused.getRunnerExecutedTransforms(), @@ -1107,7 +1116,11 @@ public class GreedyPipelineFuserTest { .build(); FusedPipeline fused = - GreedyPipelineFuser.fuse(Pipeline.newBuilder().setComponents(components).build()); + GreedyPipelineFuser.fuse( + Pipeline.newBuilder() + .setComponents(components) + .addRequirements(ParDoTranslation.REQUIRES_STATEFUL_PROCESSING_URN) + .build()); assertThat( fused.getRunnerExecutedTransforms(),