This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 342fcdb  [BEAM-9340] Validate pipeline requirements in 
PipelineValidator.
     new e8cb91f  Merge pull request #11224 from 
robertwb/java-validate-requirements
342fcdb is described below

commit 342fcdb8b0fbaae8de286968853abd34e7d862ff
Author: Robert Bradshaw <rober...@gmail.com>
AuthorDate: Wed Mar 25 11:34:29 2020 -0700

    [BEAM-9340] Validate pipeline requirements in PipelineValidator.
    
    Also fix a bug where the greedy fuser was not propagating requirements.
---
 .../core/construction/graph/FusedPipeline.java     |  9 +++-
 .../construction/graph/GreedyPipelineFuser.java    | 13 ++++--
 .../core/construction/graph/PipelineValidator.java | 51 ++++++++++++++++------
 .../graph/GreedyPipelineFuserTest.java             | 19 ++++++--
 4 files changed, 71 insertions(+), 21 deletions(-)

diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/FusedPipeline.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/FusedPipeline.java
index a67c36d..f9af790 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/FusedPipeline.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/FusedPipeline.java
@@ -38,8 +38,9 @@ public abstract class FusedPipeline {
   static FusedPipeline of(
       Components components,
       Set<ExecutableStage> environmentalStages,
-      Set<PTransformNode> runnerStages) {
-    return new AutoValue_FusedPipeline(components, environmentalStages, 
runnerStages);
+      Set<PTransformNode> runnerStages,
+      Set<String> requirements) {
+    return new AutoValue_FusedPipeline(components, environmentalStages, 
runnerStages, requirements);
   }
 
   abstract Components getComponents();
@@ -50,6 +51,9 @@ public abstract class FusedPipeline {
   /** The {@link PTransform PTransforms} that a runner is responsible for 
executing. */
   public abstract Set<PTransformNode> getRunnerExecutedTransforms();
 
+  /** The {@link PTransform PTransforms} that a runner is responsible for 
executing. */
+  public abstract Set<String> getRequirements();
+
   /**
    * Returns the {@link RunnerApi.Pipeline} representation of this {@link 
FusedPipeline}.
    *
@@ -84,6 +88,7 @@ public abstract class FusedPipeline {
         Pipeline.newBuilder()
             .setComponents(fusedComponents)
             .addAllRootTransformIds(rootTransformIds)
+            .addAllRequirements(getRequirements())
             .build();
     // Validate that fusion didn't produce a malformed pipeline.
     PipelineValidator.validate(res);
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPipelineFuser.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPipelineFuser.java
index 6184498..dbfe6c6 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPipelineFuser.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPipelineFuser.java
@@ -46,6 +46,7 @@ import 
org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNo
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ComparisonChain;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.HashMultimap;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Multimap;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
 import org.slf4j.Logger;
@@ -72,7 +73,11 @@ public class GreedyPipelineFuser {
       unfusedRootNodes.addAll(descendants.getUnfusedNodes());
       rootConsumers.addAll(descendants.getFusibleConsumers());
     }
-    this.fusedPipeline = fusePipeline(unfusedRootNodes, 
groupSiblings(rootConsumers));
+    this.fusedPipeline =
+        fusePipeline(
+            unfusedRootNodes,
+            groupSiblings(rootConsumers),
+            ImmutableSet.copyOf(p.getRequirementsList()));
   }
 
   /**
@@ -114,7 +119,8 @@ public class GreedyPipelineFuser {
    */
   private FusedPipeline fusePipeline(
       Collection<PTransformNode> initialUnfusedTransforms,
-      NavigableSet<NavigableSet<CollectionConsumer>> initialConsumers) {
+      NavigableSet<NavigableSet<CollectionConsumer>> initialConsumers,
+      Set<String> requirements) {
     Map<CollectionConsumer, ExecutableStage> consumedCollectionsAndTransforms 
= new HashMap<>();
     Set<ExecutableStage> stages = new LinkedHashSet<>();
     Set<PTransformNode> unfusedTransforms = new 
LinkedHashSet<>(initialUnfusedTransforms);
@@ -174,7 +180,8 @@ public class GreedyPipelineFuser {
                         deduplicated
                             .getDeduplicatedTransforms()
                             .getOrDefault(transform.getId(), transform))
-                .collect(Collectors.toSet())));
+                .collect(Collectors.toSet())),
+        requirements);
   }
 
   private DescendantConsumers getRootConsumers(PTransformNode rootNode) {
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/PipelineValidator.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/PipelineValidator.java
index c0ffb5c..c80947b 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/PipelineValidator.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/PipelineValidator.java
@@ -20,6 +20,7 @@ package org.apache.beam.runners.core.construction.graph;
 import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
 
 import java.util.Map;
+import java.util.Set;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.model.pipeline.v1.RunnerApi.CombinePayload;
 import org.apache.beam.model.pipeline.v1.RunnerApi.Components;
@@ -31,7 +32,9 @@ import 
org.apache.beam.model.pipeline.v1.RunnerApi.TestStreamPayload;
 import org.apache.beam.model.pipeline.v1.RunnerApi.WindowIntoPayload;
 import org.apache.beam.model.pipeline.v1.RunnerApi.WindowingStrategy;
 import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.construction.ParDoTranslation;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
 
 /**
@@ -42,7 +45,9 @@ import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
 public class PipelineValidator {
   @FunctionalInterface
   private interface TransformValidator {
-    void validate(String transformId, PTransform transform, Components 
components) throws Exception;
+    void validate(
+        String transformId, PTransform transform, Components components, 
Set<String> requirements)
+        throws Exception;
   }
 
   private static final ImmutableMap<String, TransformValidator> VALIDATORS =
@@ -95,10 +100,11 @@ public class PipelineValidator {
           transformId);
     }
 
-    validateComponents("pipeline", components);
+    validateComponents("pipeline", components, 
ImmutableSet.copyOf(p.getRequirementsList()));
   }
 
-  private static void validateComponents(String context, Components 
components) {
+  private static void validateComponents(
+      String context, Components components, Set<String> requirements) {
     {
       Map<String, String> uniqueNamesById = Maps.newHashMap();
       for (String transformId : components.getTransformsMap().keySet()) {
@@ -114,7 +120,7 @@ public class PipelineValidator {
             transformId,
             previousId,
             transform.getUniqueName());
-        validateTransform(transformId, transform, components);
+        validateTransform(transformId, transform, components, requirements);
       }
     }
     {
@@ -172,7 +178,8 @@ public class PipelineValidator {
     }
   }
 
-  private static void validateTransform(String id, PTransform transform, 
Components components) {
+  private static void validateTransform(
+      String id, PTransform transform, Components components, Set<String> 
requirements) {
     for (String subtransformId : transform.getSubtransformsList()) {
       checkArgument(
           components.containsTransforms(subtransformId),
@@ -203,14 +210,15 @@ public class PipelineValidator {
     String urn = transform.getSpec().getUrn();
     if (VALIDATORS.containsKey(urn)) {
       try {
-        VALIDATORS.get(urn).validate(id, transform, components);
+        VALIDATORS.get(urn).validate(id, transform, components, requirements);
       } catch (Exception e) {
         throw new RuntimeException(String.format("Failed to validate transform 
%s", id), e);
       }
     }
   }
 
-  private static void validateParDo(String id, PTransform transform, 
Components components)
+  private static void validateParDo(
+      String id, PTransform transform, Components components, Set<String> 
requirements)
       throws Exception {
     ParDoPayload payload = 
ParDoPayload.parseFrom(transform.getSpec().getPayload());
     // side_inputs
@@ -221,23 +229,39 @@ public class PipelineValidator {
           id,
           sideInputId);
     }
-    // TODO: Validate state_specs and timer_specs
+    if (payload.getStateSpecsCount() > 0 || payload.getTimerSpecsCount() > 0) {
+      
checkArgument(requirements.contains(ParDoTranslation.REQUIRES_STATEFUL_PROCESSING_URN));
+      // TODO: Validate state_specs and timer_specs
+    }
     if (!payload.getRestrictionCoderId().isEmpty()) {
       
checkArgument(components.containsCoders(payload.getRestrictionCoderId()));
+      
checkArgument(requirements.contains(ParDoTranslation.REQUIRES_SPLITTABLE_DOFN_URN));
+    }
+    if (payload.getRequestsFinalization()) {
+      
checkArgument(requirements.contains(ParDoTranslation.REQUIRES_BUNDLE_FINALIZATION_URN));
+    }
+    if (payload.getRequiresStableInput()) {
+      
checkArgument(requirements.contains(ParDoTranslation.REQUIRES_STABLE_INPUT_URN));
+    }
+    if (payload.getRequiresTimeSortedInput()) {
+      
checkArgument(requirements.contains(ParDoTranslation.REQUIRES_TIME_SORTED_INPUT_URN));
     }
   }
 
-  private static void validateAssignWindows(String id, PTransform transform, 
Components components)
+  private static void validateAssignWindows(
+      String id, PTransform transform, Components components, Set<String> 
requirements)
       throws Exception {
     WindowIntoPayload.parseFrom(transform.getSpec().getPayload());
   }
 
-  private static void validateTestStream(String id, PTransform transform, 
Components components)
+  private static void validateTestStream(
+      String id, PTransform transform, Components components, Set<String> 
requirements)
       throws Exception {
     TestStreamPayload.parseFrom(transform.getSpec().getPayload());
   }
 
-  private static void validateCombine(String id, PTransform transform, 
Components components)
+  private static void validateCombine(
+      String id, PTransform transform, Components components, Set<String> 
requirements)
       throws Exception {
     CombinePayload payload = 
CombinePayload.parseFrom(transform.getSpec().getPayload());
     checkArgument(
@@ -247,7 +271,8 @@ public class PipelineValidator {
   }
 
   private static void validateExecutableStage(
-      String id, PTransform transform, Components outerComponents) throws 
Exception {
+      String id, PTransform transform, Components outerComponents, Set<String> 
requirements)
+      throws Exception {
     ExecutableStagePayload payload =
         ExecutableStagePayload.parseFrom(transform.getSpec().getPayload());
 
@@ -278,7 +303,7 @@ public class PipelineValidator {
           outputId);
     }
 
-    validateComponents("ExecutableStage " + id, components);
+    validateComponents("ExecutableStage " + id, components, requirements);
 
     // TODO: Also validate that side inputs of all transforms within 
components.getTransforms()
     // are contained within payload.getSideInputsList()
diff --git 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyPipelineFuserTest.java
 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyPipelineFuserTest.java
index 0a32f9e..97b8091 100644
--- 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyPipelineFuserTest.java
+++ 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyPipelineFuserTest.java
@@ -48,6 +48,7 @@ import 
org.apache.beam.model.pipeline.v1.RunnerApi.WindowIntoPayload;
 import org.apache.beam.model.pipeline.v1.RunnerApi.WindowingStrategy;
 import org.apache.beam.runners.core.construction.Environments;
 import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.construction.ParDoTranslation;
 import 
org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode;
 import 
org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
 import org.hamcrest.Matchers;
@@ -980,7 +981,11 @@ public class GreedyPipelineFuserTest {
             .putEnvironments("common", 
Environments.createDockerEnvironment("common"))
             .build();
     FusedPipeline fused =
-        
GreedyPipelineFuser.fuse(Pipeline.newBuilder().setComponents(components).build());
+        GreedyPipelineFuser.fuse(
+            Pipeline.newBuilder()
+                .setComponents(components)
+                
.addRequirements(ParDoTranslation.REQUIRES_STATEFUL_PROCESSING_URN)
+                .build());
 
     assertThat(
         fused.getRunnerExecutedTransforms(),
@@ -1054,7 +1059,11 @@ public class GreedyPipelineFuserTest {
             .build();
 
     FusedPipeline fused =
-        
GreedyPipelineFuser.fuse(Pipeline.newBuilder().setComponents(components).build());
+        GreedyPipelineFuser.fuse(
+            Pipeline.newBuilder()
+                .setComponents(components)
+                
.addRequirements(ParDoTranslation.REQUIRES_STATEFUL_PROCESSING_URN)
+                .build());
 
     assertThat(
         fused.getRunnerExecutedTransforms(),
@@ -1107,7 +1116,11 @@ public class GreedyPipelineFuserTest {
             .build();
 
     FusedPipeline fused =
-        
GreedyPipelineFuser.fuse(Pipeline.newBuilder().setComponents(components).build());
+        GreedyPipelineFuser.fuse(
+            Pipeline.newBuilder()
+                .setComponents(components)
+                
.addRequirements(ParDoTranslation.REQUIRES_STATEFUL_PROCESSING_URN)
+                .build());
 
     assertThat(
         fused.getRunnerExecutedTransforms(),

Reply via email to