This is an automated email from the ASF dual-hosted git repository.

jbonofre pushed a commit to branch release-2.3.0
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/release-2.3.0 by this push:
     new 513c5e4  [BEAM-2806] Fix pipeline translation mode recognition in 
Flink Runner
513c5e4 is described below

commit 513c5e48a792d0e3488f520d0510f3f1ed0be864
Author: Grzegorz KoĊ‚akowski <grzegorz.kolakow...@getindata.com>
AuthorDate: Thu Feb 1 09:21:24 2018 +0100

    [BEAM-2806] Fix pipeline translation mode recognition in Flink Runner
---
 .../flink/FlinkPipelineExecutionEnvironment.java   |  5 +-
 .../flink/PipelineTranslationOptimizer.java        | 16 ++++-
 .../FlinkPipelineExecutionEnvironmentTest.java     | 68 ++++++++++++++++++++++
 3 files changed, 84 insertions(+), 5 deletions(-)

diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
index 7a6c61f..7f7281e 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
@@ -93,14 +93,15 @@ class FlinkPipelineExecutionEnvironment {
       throw new RuntimeException(e);
     }
 
-    
pipeline.replaceAll(FlinkTransformOverrides.getDefaultOverrides(options.isStreaming()));
-
     PipelineTranslationOptimizer optimizer =
         new PipelineTranslationOptimizer(TranslationMode.BATCH, options);
 
     optimizer.translate(pipeline);
     TranslationMode translationMode = optimizer.getTranslationMode();
 
+    pipeline.replaceAll(FlinkTransformOverrides.getDefaultOverrides(
+        translationMode == TranslationMode.STREAMING));
+
     FlinkPipelineTranslator translator;
     if (translationMode == TranslationMode.STREAMING) {
       this.flinkStreamEnv = createStreamExecutionEnvironment();
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/PipelineTranslationOptimizer.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/PipelineTranslationOptimizer.java
index 3acc3ea..8877f1a 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/PipelineTranslationOptimizer.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/PipelineTranslationOptimizer.java
@@ -17,9 +17,11 @@
  */
 package org.apache.beam.runners.flink;
 
-import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.runners.TransformHierarchy;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollection.IsBounded;
 import org.apache.beam.sdk.values.PValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -60,13 +62,21 @@ class PipelineTranslationOptimizer extends 
FlinkPipelineTranslator {
 
   @Override
   public void visitPrimitiveTransform(TransformHierarchy.Node node) {
-    Class<? extends PTransform> transformClass = 
node.getTransform().getClass();
-    if (transformClass == Read.Unbounded.class) {
+    AppliedPTransform<?, ?, ?> appliedPTransform = 
node.toAppliedPTransform(getPipeline());
+    if (hasUnboundedOutput(appliedPTransform)) {
+      Class<? extends PTransform> transformClass = 
node.getTransform().getClass();
       LOG.info("Found {}. Switching to streaming execution.", transformClass);
       translationMode = TranslationMode.STREAMING;
     }
   }
 
+  private boolean hasUnboundedOutput(AppliedPTransform<?, ?, ?> transform) {
+    return transform.getOutputs().values().stream()
+        .filter(value -> value instanceof PCollection)
+        .map(value -> (PCollection<?>) value)
+        .anyMatch(collection -> collection.isBounded() == IsBounded.UNBOUNDED);
+  }
+
   @Override
   public void visitValue(PValue value, TransformHierarchy.Node producer) {}
 }
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironmentTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironmentTest.java
new file mode 100644
index 0000000..0e5ce14
--- /dev/null
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironmentTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import java.io.Serializable;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.GenerateSequence;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.joda.time.Duration;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link FlinkPipelineExecutionEnvironment}.
+ */
+@RunWith(JUnit4.class)
+public class FlinkPipelineExecutionEnvironmentTest implements Serializable {
+
+  @Test
+  public void shouldRecognizeAndTranslateStreamingPipeline() {
+    FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    options.setRunner(TestFlinkRunner.class);
+    options.setFlinkMaster("[auto]");
+
+    FlinkRunner flinkRunner = FlinkRunner.fromOptions(options);
+    FlinkPipelineExecutionEnvironment flinkEnv = new 
FlinkPipelineExecutionEnvironment(options);
+    Pipeline pipeline = Pipeline.create();
+
+    pipeline
+        .apply(GenerateSequence.from(0).withRate(1, 
Duration.standardSeconds(1)))
+        .apply(ParDo.of(new DoFn<Long, String>() {
+          @ProcessElement
+          public void processElement(ProcessContext c) throws Exception {
+            c.output(Long.toString(c.element()));
+          }
+        }))
+        .apply(Window.into(FixedWindows.of(Duration.standardHours(1))))
+        
.apply(TextIO.write().withNumShards(1).withWindowedWrites().to("/dummy/path"));
+
+    flinkEnv.translate(flinkRunner, pipeline);
+
+    // no exception should be thrown
+  }
+
+}
+
+

-- 
To stop receiving notification emails like this one, please contact
jbono...@apache.org.

Reply via email to