This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 875f3e1  Translate Flink batch nodes by URN
875f3e1 is described below

commit 875f3e1b0c943b56bda99e610ede872a07573be5
Author: Ben Sidhom <sid...@google.com>
AuthorDate: Mon Dec 18 11:26:36 2017 -0800

    Translate Flink batch nodes by URN
    
    Before this change, the translator downcasts pipeline transforms. This
    cannot be done with jobs that have been submitted portably through the
    Job API. This change uses URNs (available on rehydrated pipelines) for
    translation.
---
 .../flink/FlinkBatchTransformTranslators.java      | 27 ++++++++++++++--------
 1 file changed, 17 insertions(+), 10 deletions(-)

diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
index 3689698..a4265f5 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
@@ -28,6 +28,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import javax.annotation.Nullable;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
 import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows;
 import org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction;
 import 
org.apache.beam.runners.flink.translation.functions.FlinkMergingNonShuffleReduceFunction;
@@ -96,29 +98,34 @@ class FlinkBatchTransformTranslators {
 
   @SuppressWarnings("rawtypes")
   private static final Map<
-      Class<? extends PTransform>,
+      String,
       FlinkBatchPipelineTranslator.BatchTransformTranslator> TRANSLATORS = new 
HashMap<>();
 
   static {
-    TRANSLATORS.put(View.CreatePCollectionView.class, new 
CreatePCollectionViewTranslatorBatch());
+    TRANSLATORS.put(PTransformTranslation.CREATE_VIEW_TRANSFORM_URN,
+        new CreatePCollectionViewTranslatorBatch());
 
-    TRANSLATORS.put(Combine.PerKey.class, new CombinePerKeyTranslatorBatch());
-    TRANSLATORS.put(GroupByKey.class, new GroupByKeyTranslatorBatch());
-    TRANSLATORS.put(Reshuffle.class, new ReshuffleTranslatorBatch());
+    TRANSLATORS.put(PTransformTranslation.COMBINE_TRANSFORM_URN,
+        new CombinePerKeyTranslatorBatch());
+    TRANSLATORS.put(PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN,
+        new GroupByKeyTranslatorBatch());
+    TRANSLATORS.put(PTransformTranslation.RESHUFFLE_URN, new 
ReshuffleTranslatorBatch());
 
-    TRANSLATORS.put(Flatten.PCollections.class, new 
FlattenPCollectionTranslatorBatch());
+    TRANSLATORS.put(PTransformTranslation.FLATTEN_TRANSFORM_URN,
+        new FlattenPCollectionTranslatorBatch());
 
-    TRANSLATORS.put(Window.Assign.class, new WindowAssignTranslatorBatch());
+    TRANSLATORS.put(PTransformTranslation.WINDOW_TRANSFORM_URN, new 
WindowAssignTranslatorBatch());
 
-    TRANSLATORS.put(ParDo.MultiOutput.class, new ParDoTranslatorBatch());
+    TRANSLATORS.put(PTransformTranslation.PAR_DO_TRANSFORM_URN, new 
ParDoTranslatorBatch());
 
-    TRANSLATORS.put(Read.Bounded.class, new ReadSourceTranslatorBatch());
+    TRANSLATORS.put(PTransformTranslation.READ_TRANSFORM_URN, new 
ReadSourceTranslatorBatch());
   }
 
 
   static FlinkBatchPipelineTranslator.BatchTransformTranslator<?> 
getTranslator(
       PTransform<?, ?> transform) {
-    return TRANSLATORS.get(transform.getClass());
+    @Nullable String urn = 
PTransformTranslation.urnForTransformOrNull(transform);
+    return urn == null ? null : TRANSLATORS.get(urn);
   }
 
   private static class ReadSourceTranslatorBatch<T>

-- 
To stop receiving notification emails like this one, please contact
['"commits@beam.apache.org" <commits@beam.apache.org>'].

Reply via email to