[ 
https://issues.apache.org/jira/browse/BEAM-2795?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16331082#comment-16331082
 ] 

ASF GitHub Bot commented on BEAM-2795:
--------------------------------------

kennknowles closed pull request #4384: [BEAM-2795] Use portable constructs in 
Flink streaming translator
URL: https://github.com/apache/beam/pull/4384
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnconsumedReads.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnconsumedReads.java
index c191eeb8617..c110c31c14f 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnconsumedReads.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnconsumedReads.java
@@ -46,8 +46,8 @@ public void visitPrimitiveTransform(Node node) {
 
           @Override
           public void visitValue(PValue value, Node producer) {
-            if (producer.getTransform() instanceof Read.Bounded
-                || producer.getTransform() instanceof Read.Unbounded) {
+            String urn = 
PTransformTranslation.urnForTransformOrNull(producer.getTransform());
+            if (PTransformTranslation.READ_TRANSFORM_URN.equals(urn)) {
               unconsumed.add((PCollection<?>) value);
             }
           }
diff --git a/runners/flink/pom.xml b/runners/flink/pom.xml
index a2a2e75f8fa..f98312c1d2b 100644
--- a/runners/flink/pom.xml
+++ b/runners/flink/pom.xml
@@ -250,6 +250,11 @@
       </exclusions>
     </dependency>
 
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-model-pipeline</artifactId>
+    </dependency>
+
     <dependency>
       <groupId>com.fasterxml.jackson.core</groupId>
       <artifactId>jackson-annotations</artifactId>
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/CreateStreamingFlinkView.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/CreateStreamingFlinkView.java
index ceecc1fda58..e100a77d655 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/CreateStreamingFlinkView.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/CreateStreamingFlinkView.java
@@ -18,9 +18,11 @@
 package org.apache.beam.runners.flink;
 
 import com.google.common.collect.Iterables;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import 
org.apache.beam.runners.core.construction.CreatePCollectionViewTranslation;
 import org.apache.beam.runners.core.construction.ReplacementOutputs;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderRegistry;
@@ -29,7 +31,6 @@
 import org.apache.beam.sdk.runners.PTransformOverrideFactory;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PValue;
@@ -134,17 +135,28 @@ private CreateFlinkPCollectionView(PCollectionView<ViewT> 
view) {
 
   public static class Factory<ElemT, ViewT>
       implements PTransformOverrideFactory<
-          PCollection<ElemT>, PCollection<ElemT>, CreatePCollectionView<ElemT, 
ViewT>> {
+          PCollection<ElemT>,
+          PCollection<ElemT>,
+          PTransform<PCollection<ElemT>, PCollection<ElemT>>> {
     public Factory() {}
 
     @Override
     public PTransformReplacement<PCollection<ElemT>, PCollection<ElemT>> 
getReplacementTransform(
         AppliedPTransform<
-                PCollection<ElemT>, PCollection<ElemT>, 
CreatePCollectionView<ElemT, ViewT>>
-            transform) {
-      return PTransformReplacement.of(
-          (PCollection<ElemT>) 
Iterables.getOnlyElement(transform.getInputs().values()),
-          new CreateStreamingFlinkView<ElemT, 
ViewT>(transform.getTransform().getView()));
+            PCollection<ElemT>,
+            PCollection<ElemT>,
+            PTransform<PCollection<ElemT>, PCollection<ElemT>>> transform) {
+      PCollection<ElemT> collection =
+          (PCollection<ElemT>) 
Iterables.getOnlyElement(transform.getInputs().values());
+      PCollectionView<ViewT> view;
+      try {
+        view = CreatePCollectionViewTranslation.getView(transform);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+      CreateStreamingFlinkView<ElemT, ViewT> createFlinkView =
+          new CreateStreamingFlinkView<ElemT, ViewT>(view);
+      return PTransformReplacement.of(collection, createFlinkView);
     }
 
     @Override
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
index d2a2016c98a..7a6c61f8b36 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
@@ -19,7 +19,9 @@
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
+import java.io.IOException;
 import java.util.List;
+import org.apache.beam.runners.core.construction.PipelineTranslation;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.java.CollectionEnvironment;
@@ -84,6 +86,13 @@ public void translate(FlinkRunner flinkRunner, Pipeline 
pipeline) {
     this.flinkBatchEnv = null;
     this.flinkStreamEnv = null;
 
+    // Serialize and rehydrate pipeline to make sure we only depend serialized 
transforms.
+    try {
+      pipeline = 
PipelineTranslation.fromProto(PipelineTranslation.toProto(pipeline));
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+
     
pipeline.replaceAll(FlinkTransformOverrides.getDefaultOverrides(options.isStreaming()));
 
     PipelineTranslationOptimizer optimizer =
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
index f733e2e7513..2e16ed9966c 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
@@ -19,6 +19,7 @@
 
 import java.util.Map;
 import org.apache.beam.runners.core.construction.PTransformReplacements;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
 import org.apache.beam.runners.core.construction.ReplacementOutputs;
 import org.apache.beam.runners.core.construction.SplittableParDo;
 import org.apache.beam.runners.core.construction.UnconsumedReads;
@@ -109,9 +110,10 @@ public void 
visitPrimitiveTransform(TransformHierarchy.Node node) {
         FlinkStreamingTransformTranslators.getTranslator(transform);
 
     if (translator == null || !applyCanTranslate(transform, node, translator)) 
{
-      LOG.info(node.getTransform().getClass().toString());
+      String transformUrn = PTransformTranslation.urnForTransform(transform);
+      LOG.info(transformUrn);
       throw new UnsupportedOperationException(
-          "The transform " + transform + " is currently not supported.");
+          "The transform " + transformUrn + " is currently not supported.");
     }
     applyStreamingTransform(transform, node, translator);
   }
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
index aa5cc39f3e8..e1937707ba8 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
@@ -23,6 +23,7 @@
 import com.google.auto.service.AutoService;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -34,7 +35,10 @@
 import org.apache.beam.runners.core.KeyedWorkItem;
 import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems;
 import org.apache.beam.runners.core.SystemReduceFn;
+import org.apache.beam.runners.core.construction.CombineTranslation;
 import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.construction.ParDoTranslation;
+import org.apache.beam.runners.core.construction.ReadTranslation;
 import org.apache.beam.runners.core.construction.SplittableParDo;
 import 
org.apache.beam.runners.core.construction.TransformPayloadTranslatorRegistrar;
 import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows;
@@ -53,14 +57,13 @@
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.transforms.CombineFnBase.GlobalCombineFn;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.Flatten;
-import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.Reshuffle;
 import org.apache.beam.sdk.transforms.join.RawUnionValue;
 import org.apache.beam.sdk.transforms.join.UnionCoder;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
@@ -68,7 +71,6 @@
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.AppliedCombineFn;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -79,6 +81,7 @@
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
 import org.apache.beam.sdk.values.ValueWithRecordId;
 import org.apache.beam.sdk.values.WindowingStrategy;
 import org.apache.flink.api.common.functions.FlatMapFunction;
@@ -150,11 +153,12 @@
   // 
--------------------------------------------------------------------------------------------
 
   private static class UnboundedReadSourceTranslator<T>
-      extends 
FlinkStreamingPipelineTranslator.StreamTransformTranslator<Read.Unbounded<T>> {
+      extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<
+      PTransform<PBegin, PCollection<T>>> {
 
     @Override
     public void translateNode(
-        Read.Unbounded<T> transform,
+        PTransform<PBegin, PCollection<T>> transform,
         FlinkStreamingTranslationContext context) {
       PCollection<T> output = context.getOutput(transform);
 
@@ -170,19 +174,28 @@ public void translateNode(
               ValueWithRecordId.ValueWithRecordIdCoder.of(coder),
               output.getWindowingStrategy().getWindowFn().windowCoder()));
 
+      UnboundedSource<T, ?> rawSource;
+      try {
+        rawSource = ReadTranslation.unboundedSourceFromTransform(
+            (AppliedPTransform<PBegin, PCollection<T>, PTransform<PBegin, 
PCollection<T>>>)
+                context.getCurrentTransform());
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+
       try {
 
         UnboundedSourceWrapper<T, ?> sourceWrapper =
             new UnboundedSourceWrapper<>(
                 context.getCurrentTransform().getFullName(),
                 context.getPipelineOptions(),
-                transform.getSource(),
+                rawSource,
                 context.getExecutionEnvironment().getParallelism());
         nonDedupSource = context
             .getExecutionEnvironment()
             
.addSource(sourceWrapper).name(transform.getName()).returns(withIdTypeInfo);
 
-        if (transform.getSource().requiresDeduping()) {
+        if (rawSource.requiresDeduping()) {
           source = nonDedupSource.keyBy(
               new ValueWithRecordIdKeySelector<T>())
               .transform("deduping", outputTypeInfo, new 
DedupingOperator<T>());
@@ -191,7 +204,7 @@ public void translateNode(
         }
       } catch (Exception e) {
         throw new RuntimeException(
-            "Error while translating UnboundedSource: " + 
transform.getSource(), e);
+            "Error while translating UnboundedSource: " + rawSource, e);
       }
 
       context.setOutputDataStream(output, source);
@@ -237,7 +250,7 @@ public void flatMap(WindowedValue<ValueWithRecordId<T>> 
value,
     void translateNode(
         PTransform<PBegin, PCollection<T>> transform, 
FlinkStreamingTranslationContext context) {
       if 
(context.getOutput(transform).isBounded().equals(PCollection.IsBounded.BOUNDED))
 {
-        boundedTranslator.translateNode((Read.Bounded<T>) transform, context);
+        boundedTranslator.translateNode(transform, context);
       } else {
         unboundedTranslator.translateNode((Read.Unbounded<T>) transform, 
context);
       }
@@ -245,11 +258,12 @@ void translateNode(
   }
 
   private static class BoundedReadSourceTranslator<T>
-      extends 
FlinkStreamingPipelineTranslator.StreamTransformTranslator<Read.Bounded<T>> {
+      extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<
+          PTransform<PBegin, PCollection<T>>> {
 
     @Override
     public void translateNode(
-        Read.Bounded<T> transform,
+        PTransform<PBegin, PCollection<T>> transform,
         FlinkStreamingTranslationContext context) {
       PCollection<T> output = context.getOutput(transform);
 
@@ -257,20 +271,29 @@ public void translateNode(
           context.getTypeInfo(context.getOutput(transform));
 
 
+      BoundedSource<T> rawSource;
+      try {
+        rawSource = ReadTranslation.boundedSourceFromTransform(
+            (AppliedPTransform<PBegin, PCollection<T>, PTransform<PBegin, 
PCollection<T>>>)
+                context.getCurrentTransform());
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+
       DataStream<WindowedValue<T>> source;
       try {
         BoundedSourceWrapper<T> sourceWrapper =
             new BoundedSourceWrapper<>(
                 context.getCurrentTransform().getFullName(),
                 context.getPipelineOptions(),
-                transform.getSource(),
+                rawSource,
                 context.getExecutionEnvironment().getParallelism());
         source = context
             .getExecutionEnvironment()
             
.addSource(sourceWrapper).name(transform.getName()).returns(outputTypeInfo);
       } catch (Exception e) {
         throw new RuntimeException(
-            "Error while translating BoundedSource: " + transform.getSource(), 
e);
+            "Error while translating BoundedSource: " + rawSource, e);
       }
 
       context.setOutputDataStream(output, source);
@@ -353,7 +376,7 @@ public RawUnionValue map(T o) throws Exception {
   }
 
   /**
-   * Helper for translating {@link ParDo.MultiOutput} and {@link
+   * Helper for translating {@code ParDo.MultiOutput} and {@link
    * SplittableParDoViaKeyedWorkItems.ProcessElements}.
    */
   static class ParDoTranslationHelper {
@@ -403,7 +426,7 @@ public RawUnionValue map(T o) throws Exception {
         if (!tagsToOutputTags.containsKey(entry.getKey())) {
           tagsToOutputTags.put(
               entry.getKey(),
-              new OutputTag<>(
+              new OutputTag<WindowedValue<?>>(
                   entry.getKey().getId(),
                   (TypeInformation) context.getTypeInfo((PCollection<?>) 
entry.getValue())
               )
@@ -529,19 +552,47 @@ public RawUnionValue map(T o) throws Exception {
 
     @Override
     public void translateNode(
-        PTransform<PCollection<InputT>, PCollectionTuple> rawTransform,
+        PTransform<PCollection<InputT>, PCollectionTuple> transform,
         FlinkStreamingTranslationContext context) {
 
-      ParDo.MultiOutput<InputT, OutputT> transform = (ParDo.MultiOutput) 
rawTransform;
+      DoFn<InputT, OutputT> doFn;
+      try {
+        doFn = (DoFn<InputT, OutputT>) 
ParDoTranslation.getDoFn(context.getCurrentTransform());
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+
+      TupleTag<OutputT> mainOutputTag;
+      try {
+        mainOutputTag = (TupleTag<OutputT>)
+            ParDoTranslation.getMainOutputTag(context.getCurrentTransform());
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+
+      List<PCollectionView<?>> sideInputs;
+      try {
+        sideInputs = 
ParDoTranslation.getSideInputs(context.getCurrentTransform());
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+
+      TupleTagList additionalOutputTags;
+      try {
+        additionalOutputTags = ParDoTranslation.getAdditionalOutputTags(
+            context.getCurrentTransform());
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
 
       ParDoTranslationHelper.translateParDo(
           transform.getName(),
-          transform.getFn(),
-          (PCollection<InputT>) context.getInput(transform),
-          transform.getSideInputs(),
+          doFn,
+          context.getInput(transform),
+          sideInputs,
           context.getOutputs(transform),
-          transform.getMainOutputTag(),
-          transform.getAdditionalOutputTags().getAll(),
+          mainOutputTag,
+          additionalOutputTags.getAll(),
           context,
           new ParDoTranslationHelper.DoFnOperatorFactory<InputT, OutputT>() {
             @Override
@@ -653,11 +704,12 @@ public void translateNode(
   }
 
   private static class WindowAssignTranslator<T>
-      extends 
FlinkStreamingPipelineTranslator.StreamTransformTranslator<Window.Assign<T>> {
+      extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<
+          PTransform<PCollection<T>, PCollection<T>>> {
 
     @Override
     public void translateNode(
-        Window.Assign<T> transform,
+        PTransform<PCollection<T>, PCollection<T>> transform,
         FlinkStreamingTranslationContext context) {
 
       @SuppressWarnings("unchecked")
@@ -686,11 +738,12 @@ public void translateNode(
   }
 
   private static class ReshuffleTranslatorStreaming<K, InputT>
-      extends 
FlinkStreamingPipelineTranslator.StreamTransformTranslator<Reshuffle<K, 
InputT>> {
+      extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<
+        PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, InputT>>>> {
 
     @Override
     public void translateNode(
-        Reshuffle<K, InputT> transform,
+        PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, InputT>>> 
transform,
         FlinkStreamingTranslationContext context) {
 
       DataStream<WindowedValue<KV<K, InputT>>> inputDataSet =
@@ -703,11 +756,12 @@ public void translateNode(
 
 
   private static class GroupByKeyTranslator<K, InputT>
-      extends 
FlinkStreamingPipelineTranslator.StreamTransformTranslator<GroupByKey<K, 
InputT>> {
+      extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<
+          PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, 
Iterable<InputT>>>>> {
 
     @Override
     public void translateNode(
-        GroupByKey<K, InputT> transform,
+        PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, 
Iterable<InputT>>>> transform,
         FlinkStreamingTranslationContext context) {
 
       PCollection<KV<K, InputT>> input = context.getInput(transform);
@@ -785,11 +839,11 @@ public void translateNode(
 
   private static class CombinePerKeyTranslator<K, InputT, OutputT>
       extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<
-      Combine.PerKey<K, InputT, OutputT>> {
+      PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>>> {
 
     @Override
     boolean canTranslate(
-        Combine.PerKey<K, InputT, OutputT> transform,
+        PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>> 
transform,
         FlinkStreamingTranslationContext context) {
 
       // if we have a merging window strategy and side inputs we cannot
@@ -801,12 +855,19 @@ boolean canTranslate(
       WindowingStrategy<?, BoundedWindow> windowingStrategy =
           (WindowingStrategy<?, BoundedWindow>) input.getWindowingStrategy();
 
-      return windowingStrategy.getWindowFn().isNonMerging() || 
transform.getSideInputs().isEmpty();
+      boolean hasNoSideInputs;
+      try {
+        hasNoSideInputs = 
CombineTranslation.getSideInputs(context.getCurrentTransform()).isEmpty();
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+
+      return windowingStrategy.getWindowFn().isNonMerging() || hasNoSideInputs;
     }
 
     @Override
     public void translateNode(
-        Combine.PerKey<K, InputT, OutputT> transform,
+        PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>> 
transform,
         FlinkStreamingTranslationContext context) {
 
       PCollection<KV<K, InputT>> input = context.getInput(transform);
@@ -843,17 +904,29 @@ public void translateNode(
                 SingletonKeyedWorkItem<K, InputT>>, ByteBuffer> 
keyedWorkItemStream = workItemStream
           .keyBy(new WorkItemKeySelector<K, 
InputT>(inputKvCoder.getKeyCoder()));
 
+      GlobalCombineFn<? super InputT, ?, OutputT> combineFn;
+      try {
+        combineFn = (GlobalCombineFn<? super InputT, ?, OutputT>)
+            CombineTranslation.getCombineFn(context.getCurrentTransform());
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
       SystemReduceFn<K, InputT, ?, OutputT, BoundedWindow> reduceFn = 
SystemReduceFn.combining(
           inputKvCoder.getKeyCoder(),
           AppliedCombineFn.withInputCoder(
-              transform.getFn(), input.getPipeline().getCoderRegistry(), 
inputKvCoder));
+              combineFn, input.getPipeline().getCoderRegistry(), 
inputKvCoder));
 
       Coder<WindowedValue<KV<K, OutputT>>> outputCoder =
           context.getCoder(context.getOutput(transform));
       TypeInformation<WindowedValue<KV<K, OutputT>>> outputTypeInfo =
           context.getTypeInfo(context.getOutput(transform));
 
-      List<PCollectionView<?>> sideInputs = transform.getSideInputs();
+      List<PCollectionView<?>> sideInputs;
+      try {
+        sideInputs = 
CombineTranslation.getSideInputs(context.getCurrentTransform());
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
 
       if (sideInputs.isEmpty()) {
 
@@ -931,18 +1004,18 @@ public void translateNode(
 
   private static class GBKIntoKeyedWorkItemsTranslator<K, InputT>
       extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<
-      SplittableParDoViaKeyedWorkItems.GBKIntoKeyedWorkItems<K, InputT>> {
+      PTransform<PCollection<KV<K, InputT>>, PCollection<KeyedWorkItem<K, 
InputT>>>> {
 
     @Override
     boolean canTranslate(
-        SplittableParDoViaKeyedWorkItems.GBKIntoKeyedWorkItems<K, InputT> 
transform,
+        PTransform<PCollection<KV<K, InputT>>, PCollection<KeyedWorkItem<K, 
InputT>>> transform,
         FlinkStreamingTranslationContext context) {
       return true;
     }
 
     @Override
     public void translateNode(
-        SplittableParDoViaKeyedWorkItems.GBKIntoKeyedWorkItems<K, InputT> 
transform,
+        PTransform<PCollection<KV<K, InputT>>, PCollection<KeyedWorkItem<K, 
InputT>>> transform,
         FlinkStreamingTranslationContext context) {
 
       PCollection<KV<K, InputT>> input = context.getInput(transform);
@@ -982,11 +1055,11 @@ public void translateNode(
 
   private static class FlattenPCollectionTranslator<T>
       extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<
-      Flatten.PCollections<T>> {
+        PTransform<PCollection<T>, PCollection<T>>> {
 
     @Override
     public void translateNode(
-        Flatten.PCollections<T> transform,
+        PTransform<PCollection<T>, PCollection<T>> transform,
         FlinkStreamingTranslationContext context) {
       Map<TupleTag<?>, PValue> allInputs = context.getInputs(transform);
 
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkTransformOverrides.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkTransformOverrides.java
index 1dc8de91013..9baef8f492a 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkTransformOverrides.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkTransformOverrides.java
@@ -21,10 +21,10 @@
 import java.util.List;
 import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems;
 import org.apache.beam.runners.core.construction.PTransformMatchers;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
 import org.apache.beam.runners.core.construction.SplittableParDo;
 import org.apache.beam.sdk.runners.PTransformOverride;
 import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.View;
 
 /**
  * {@link PTransform} overrides for Flink runner.
@@ -35,15 +35,16 @@
       return ImmutableList.<PTransformOverride>builder()
           .add(
               PTransformOverride.of(
-                  PTransformMatchers.splittableParDoMulti(),
+                  PTransformMatchers.splittableParDo(),
                   new 
FlinkStreamingPipelineTranslator.SplittableParDoOverrideFactory()))
           .add(
               PTransformOverride.of(
-                  
PTransformMatchers.classEqualTo(SplittableParDo.ProcessKeyedElements.class),
+                  PTransformMatchers.urnEqualTo(
+                      SplittableParDo.SPLITTABLE_PROCESS_KEYED_ELEMENTS_URN),
                   new SplittableParDoViaKeyedWorkItems.OverrideFactory()))
           .add(
               PTransformOverride.of(
-                  
PTransformMatchers.classEqualTo(View.CreatePCollectionView.class),
+                  
PTransformMatchers.urnEqualTo(PTransformTranslation.CREATE_VIEW_TRANSFORM_URN),
                   new CreateStreamingFlinkView.Factory()))
           .build();
     } else {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> FlinkRunner: translate using SDK-agnostic means
> -----------------------------------------------
>
>                 Key: BEAM-2795
>                 URL: https://issues.apache.org/jira/browse/BEAM-2795
>             Project: Beam
>          Issue Type: Sub-task
>          Components: runner-flink
>            Reporter: Ben Sidhom
>            Assignee: Kenneth Knowles
>            Priority: Major
>              Labels: portability
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to