[ https://issues.apache.org/jira/browse/BEAM-2795?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16331082#comment-16331082 ]
ASF GitHub Bot commented on BEAM-2795: -------------------------------------- kennknowles closed pull request #4384: [BEAM-2795] Use portable constructs in Flink streaming translator URL: https://github.com/apache/beam/pull/4384 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnconsumedReads.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnconsumedReads.java index c191eeb8617..c110c31c14f 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnconsumedReads.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnconsumedReads.java @@ -46,8 +46,8 @@ public void visitPrimitiveTransform(Node node) { @Override public void visitValue(PValue value, Node producer) { - if (producer.getTransform() instanceof Read.Bounded - || producer.getTransform() instanceof Read.Unbounded) { + String urn = PTransformTranslation.urnForTransformOrNull(producer.getTransform()); + if (PTransformTranslation.READ_TRANSFORM_URN.equals(urn)) { unconsumed.add((PCollection<?>) value); } } diff --git a/runners/flink/pom.xml b/runners/flink/pom.xml index a2a2e75f8fa..f98312c1d2b 100644 --- a/runners/flink/pom.xml +++ b/runners/flink/pom.xml @@ -250,6 +250,11 @@ </exclusions> </dependency> + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-model-pipeline</artifactId> + </dependency> + <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-annotations</artifactId> diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/CreateStreamingFlinkView.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/CreateStreamingFlinkView.java index ceecc1fda58..e100a77d655 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/CreateStreamingFlinkView.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/CreateStreamingFlinkView.java @@ -18,9 +18,11 @@ package org.apache.beam.runners.flink; import com.google.common.collect.Iterables; +import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; +import org.apache.beam.runners.core.construction.CreatePCollectionViewTranslation; import org.apache.beam.runners.core.construction.ReplacementOutputs; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderRegistry; @@ -29,7 +31,6 @@ import org.apache.beam.sdk.runners.PTransformOverrideFactory; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.View.CreatePCollectionView; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.PValue; @@ -134,17 +135,28 @@ private CreateFlinkPCollectionView(PCollectionView<ViewT> view) { public static class Factory<ElemT, ViewT> implements PTransformOverrideFactory< - PCollection<ElemT>, PCollection<ElemT>, CreatePCollectionView<ElemT, ViewT>> { + PCollection<ElemT>, + PCollection<ElemT>, + PTransform<PCollection<ElemT>, PCollection<ElemT>>> { public Factory() {} @Override public PTransformReplacement<PCollection<ElemT>, PCollection<ElemT>> getReplacementTransform( AppliedPTransform< - PCollection<ElemT>, PCollection<ElemT>, CreatePCollectionView<ElemT, ViewT>> - transform) { - return PTransformReplacement.of( - (PCollection<ElemT>) Iterables.getOnlyElement(transform.getInputs().values()), - new CreateStreamingFlinkView<ElemT, ViewT>(transform.getTransform().getView())); + PCollection<ElemT>, + PCollection<ElemT>, + PTransform<PCollection<ElemT>, PCollection<ElemT>>> transform) { + PCollection<ElemT> collection = + (PCollection<ElemT>) Iterables.getOnlyElement(transform.getInputs().values()); + PCollectionView<ViewT> view; + try { + view = CreatePCollectionViewTranslation.getView(transform); + } catch (IOException e) { + throw new RuntimeException(e); + } + CreateStreamingFlinkView<ElemT, ViewT> createFlinkView = + new CreateStreamingFlinkView<ElemT, ViewT>(view); + return PTransformReplacement.of(collection, createFlinkView); } @Override diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java index d2a2016c98a..7a6c61f8b36 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java @@ -19,7 +19,9 @@ import static com.google.common.base.Preconditions.checkNotNull; +import java.io.IOException; import java.util.List; +import org.apache.beam.runners.core.construction.PipelineTranslation; import org.apache.beam.sdk.Pipeline; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.java.CollectionEnvironment; @@ -84,6 +86,13 @@ public void translate(FlinkRunner flinkRunner, Pipeline pipeline) { this.flinkBatchEnv = null; this.flinkStreamEnv = null; + // Serialize and rehydrate pipeline to make sure we only depend serialized transforms. + try { + pipeline = PipelineTranslation.fromProto(PipelineTranslation.toProto(pipeline)); + } catch (IOException e) { + throw new RuntimeException(e); + } + pipeline.replaceAll(FlinkTransformOverrides.getDefaultOverrides(options.isStreaming())); PipelineTranslationOptimizer optimizer = diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java index f733e2e7513..2e16ed9966c 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java @@ -19,6 +19,7 @@ import java.util.Map; import org.apache.beam.runners.core.construction.PTransformReplacements; +import org.apache.beam.runners.core.construction.PTransformTranslation; import org.apache.beam.runners.core.construction.ReplacementOutputs; import org.apache.beam.runners.core.construction.SplittableParDo; import org.apache.beam.runners.core.construction.UnconsumedReads; @@ -109,9 +110,10 @@ public void visitPrimitiveTransform(TransformHierarchy.Node node) { FlinkStreamingTransformTranslators.getTranslator(transform); if (translator == null || !applyCanTranslate(transform, node, translator)) { - LOG.info(node.getTransform().getClass().toString()); + String transformUrn = PTransformTranslation.urnForTransform(transform); + LOG.info(transformUrn); throw new UnsupportedOperationException( - "The transform " + transform + " is currently not supported."); + "The transform " + transformUrn + " is currently not supported."); } applyStreamingTransform(transform, node, translator); } diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java index aa5cc39f3e8..e1937707ba8 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java @@ -23,6 +23,7 @@ import com.google.auto.service.AutoService; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; +import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; @@ -34,7 +35,10 @@ import org.apache.beam.runners.core.KeyedWorkItem; import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems; import org.apache.beam.runners.core.SystemReduceFn; +import org.apache.beam.runners.core.construction.CombineTranslation; import org.apache.beam.runners.core.construction.PTransformTranslation; +import org.apache.beam.runners.core.construction.ParDoTranslation; +import org.apache.beam.runners.core.construction.ReadTranslation; import org.apache.beam.runners.core.construction.SplittableParDo; import org.apache.beam.runners.core.construction.TransformPayloadTranslatorRegistrar; import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows; @@ -53,14 +57,13 @@ import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VoidCoder; +import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.Read; -import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.io.UnboundedSource; +import org.apache.beam.sdk.runners.AppliedPTransform; +import org.apache.beam.sdk.transforms.CombineFnBase.GlobalCombineFn; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.Flatten; -import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.Reshuffle; import org.apache.beam.sdk.transforms.join.RawUnionValue; import org.apache.beam.sdk.transforms.join.UnionCoder; import org.apache.beam.sdk.transforms.reflect.DoFnSignature; @@ -68,7 +71,6 @@ import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; -import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.AppliedCombineFn; import org.apache.beam.sdk.util.WindowedValue; @@ -79,6 +81,7 @@ import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.PValue; import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; import org.apache.beam.sdk.values.ValueWithRecordId; import org.apache.beam.sdk.values.WindowingStrategy; import org.apache.flink.api.common.functions.FlatMapFunction; @@ -150,11 +153,12 @@ // -------------------------------------------------------------------------------------------- private static class UnboundedReadSourceTranslator<T> - extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<Read.Unbounded<T>> { + extends FlinkStreamingPipelineTranslator.StreamTransformTranslator< + PTransform<PBegin, PCollection<T>>> { @Override public void translateNode( - Read.Unbounded<T> transform, + PTransform<PBegin, PCollection<T>> transform, FlinkStreamingTranslationContext context) { PCollection<T> output = context.getOutput(transform); @@ -170,19 +174,28 @@ public void translateNode( ValueWithRecordId.ValueWithRecordIdCoder.of(coder), output.getWindowingStrategy().getWindowFn().windowCoder())); + UnboundedSource<T, ?> rawSource; + try { + rawSource = ReadTranslation.unboundedSourceFromTransform( + (AppliedPTransform<PBegin, PCollection<T>, PTransform<PBegin, PCollection<T>>>) + context.getCurrentTransform()); + } catch (IOException e) { + throw new RuntimeException(e); + } + try { UnboundedSourceWrapper<T, ?> sourceWrapper = new UnboundedSourceWrapper<>( context.getCurrentTransform().getFullName(), context.getPipelineOptions(), - transform.getSource(), + rawSource, context.getExecutionEnvironment().getParallelism()); nonDedupSource = context .getExecutionEnvironment() .addSource(sourceWrapper).name(transform.getName()).returns(withIdTypeInfo); - if (transform.getSource().requiresDeduping()) { + if (rawSource.requiresDeduping()) { source = nonDedupSource.keyBy( new ValueWithRecordIdKeySelector<T>()) .transform("deduping", outputTypeInfo, new DedupingOperator<T>()); @@ -191,7 +204,7 @@ public void translateNode( } } catch (Exception e) { throw new RuntimeException( - "Error while translating UnboundedSource: " + transform.getSource(), e); + "Error while translating UnboundedSource: " + rawSource, e); } context.setOutputDataStream(output, source); @@ -237,7 +250,7 @@ public void flatMap(WindowedValue<ValueWithRecordId<T>> value, void translateNode( PTransform<PBegin, PCollection<T>> transform, FlinkStreamingTranslationContext context) { if (context.getOutput(transform).isBounded().equals(PCollection.IsBounded.BOUNDED)) { - boundedTranslator.translateNode((Read.Bounded<T>) transform, context); + boundedTranslator.translateNode(transform, context); } else { unboundedTranslator.translateNode((Read.Unbounded<T>) transform, context); } @@ -245,11 +258,12 @@ void translateNode( } private static class BoundedReadSourceTranslator<T> - extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<Read.Bounded<T>> { + extends FlinkStreamingPipelineTranslator.StreamTransformTranslator< + PTransform<PBegin, PCollection<T>>> { @Override public void translateNode( - Read.Bounded<T> transform, + PTransform<PBegin, PCollection<T>> transform, FlinkStreamingTranslationContext context) { PCollection<T> output = context.getOutput(transform); @@ -257,20 +271,29 @@ public void translateNode( context.getTypeInfo(context.getOutput(transform)); + BoundedSource<T> rawSource; + try { + rawSource = ReadTranslation.boundedSourceFromTransform( + (AppliedPTransform<PBegin, PCollection<T>, PTransform<PBegin, PCollection<T>>>) + context.getCurrentTransform()); + } catch (IOException e) { + throw new RuntimeException(e); + } + DataStream<WindowedValue<T>> source; try { BoundedSourceWrapper<T> sourceWrapper = new BoundedSourceWrapper<>( context.getCurrentTransform().getFullName(), context.getPipelineOptions(), - transform.getSource(), + rawSource, context.getExecutionEnvironment().getParallelism()); source = context .getExecutionEnvironment() .addSource(sourceWrapper).name(transform.getName()).returns(outputTypeInfo); } catch (Exception e) { throw new RuntimeException( - "Error while translating BoundedSource: " + transform.getSource(), e); + "Error while translating BoundedSource: " + rawSource, e); } context.setOutputDataStream(output, source); @@ -353,7 +376,7 @@ public RawUnionValue map(T o) throws Exception { } /** - * Helper for translating {@link ParDo.MultiOutput} and {@link + * Helper for translating {@code ParDo.MultiOutput} and {@link * SplittableParDoViaKeyedWorkItems.ProcessElements}. */ static class ParDoTranslationHelper { @@ -403,7 +426,7 @@ public RawUnionValue map(T o) throws Exception { if (!tagsToOutputTags.containsKey(entry.getKey())) { tagsToOutputTags.put( entry.getKey(), - new OutputTag<>( + new OutputTag<WindowedValue<?>>( entry.getKey().getId(), (TypeInformation) context.getTypeInfo((PCollection<?>) entry.getValue()) ) @@ -529,19 +552,47 @@ public RawUnionValue map(T o) throws Exception { @Override public void translateNode( - PTransform<PCollection<InputT>, PCollectionTuple> rawTransform, + PTransform<PCollection<InputT>, PCollectionTuple> transform, FlinkStreamingTranslationContext context) { - ParDo.MultiOutput<InputT, OutputT> transform = (ParDo.MultiOutput) rawTransform; + DoFn<InputT, OutputT> doFn; + try { + doFn = (DoFn<InputT, OutputT>) ParDoTranslation.getDoFn(context.getCurrentTransform()); + } catch (IOException e) { + throw new RuntimeException(e); + } + + TupleTag<OutputT> mainOutputTag; + try { + mainOutputTag = (TupleTag<OutputT>) + ParDoTranslation.getMainOutputTag(context.getCurrentTransform()); + } catch (IOException e) { + throw new RuntimeException(e); + } + + List<PCollectionView<?>> sideInputs; + try { + sideInputs = ParDoTranslation.getSideInputs(context.getCurrentTransform()); + } catch (IOException e) { + throw new RuntimeException(e); + } + + TupleTagList additionalOutputTags; + try { + additionalOutputTags = ParDoTranslation.getAdditionalOutputTags( + context.getCurrentTransform()); + } catch (IOException e) { + throw new RuntimeException(e); + } ParDoTranslationHelper.translateParDo( transform.getName(), - transform.getFn(), - (PCollection<InputT>) context.getInput(transform), - transform.getSideInputs(), + doFn, + context.getInput(transform), + sideInputs, context.getOutputs(transform), - transform.getMainOutputTag(), - transform.getAdditionalOutputTags().getAll(), + mainOutputTag, + additionalOutputTags.getAll(), context, new ParDoTranslationHelper.DoFnOperatorFactory<InputT, OutputT>() { @Override @@ -653,11 +704,12 @@ public void translateNode( } private static class WindowAssignTranslator<T> - extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<Window.Assign<T>> { + extends FlinkStreamingPipelineTranslator.StreamTransformTranslator< + PTransform<PCollection<T>, PCollection<T>>> { @Override public void translateNode( - Window.Assign<T> transform, + PTransform<PCollection<T>, PCollection<T>> transform, FlinkStreamingTranslationContext context) { @SuppressWarnings("unchecked") @@ -686,11 +738,12 @@ public void translateNode( } private static class ReshuffleTranslatorStreaming<K, InputT> - extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<Reshuffle<K, InputT>> { + extends FlinkStreamingPipelineTranslator.StreamTransformTranslator< + PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, InputT>>>> { @Override public void translateNode( - Reshuffle<K, InputT> transform, + PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, InputT>>> transform, FlinkStreamingTranslationContext context) { DataStream<WindowedValue<KV<K, InputT>>> inputDataSet = @@ -703,11 +756,12 @@ public void translateNode( private static class GroupByKeyTranslator<K, InputT> - extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<GroupByKey<K, InputT>> { + extends FlinkStreamingPipelineTranslator.StreamTransformTranslator< + PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, Iterable<InputT>>>>> { @Override public void translateNode( - GroupByKey<K, InputT> transform, + PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, Iterable<InputT>>>> transform, FlinkStreamingTranslationContext context) { PCollection<KV<K, InputT>> input = context.getInput(transform); @@ -785,11 +839,11 @@ public void translateNode( private static class CombinePerKeyTranslator<K, InputT, OutputT> extends FlinkStreamingPipelineTranslator.StreamTransformTranslator< - Combine.PerKey<K, InputT, OutputT>> { + PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>>> { @Override boolean canTranslate( - Combine.PerKey<K, InputT, OutputT> transform, + PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>> transform, FlinkStreamingTranslationContext context) { // if we have a merging window strategy and side inputs we cannot @@ -801,12 +855,19 @@ boolean canTranslate( WindowingStrategy<?, BoundedWindow> windowingStrategy = (WindowingStrategy<?, BoundedWindow>) input.getWindowingStrategy(); - return windowingStrategy.getWindowFn().isNonMerging() || transform.getSideInputs().isEmpty(); + boolean hasNoSideInputs; + try { + hasNoSideInputs = CombineTranslation.getSideInputs(context.getCurrentTransform()).isEmpty(); + } catch (IOException e) { + throw new RuntimeException(e); + } + + return windowingStrategy.getWindowFn().isNonMerging() || hasNoSideInputs; } @Override public void translateNode( - Combine.PerKey<K, InputT, OutputT> transform, + PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>> transform, FlinkStreamingTranslationContext context) { PCollection<KV<K, InputT>> input = context.getInput(transform); @@ -843,17 +904,29 @@ public void translateNode( SingletonKeyedWorkItem<K, InputT>>, ByteBuffer> keyedWorkItemStream = workItemStream .keyBy(new WorkItemKeySelector<K, InputT>(inputKvCoder.getKeyCoder())); + GlobalCombineFn<? super InputT, ?, OutputT> combineFn; + try { + combineFn = (GlobalCombineFn<? super InputT, ?, OutputT>) + CombineTranslation.getCombineFn(context.getCurrentTransform()); + } catch (IOException e) { + throw new RuntimeException(e); + } SystemReduceFn<K, InputT, ?, OutputT, BoundedWindow> reduceFn = SystemReduceFn.combining( inputKvCoder.getKeyCoder(), AppliedCombineFn.withInputCoder( - transform.getFn(), input.getPipeline().getCoderRegistry(), inputKvCoder)); + combineFn, input.getPipeline().getCoderRegistry(), inputKvCoder)); Coder<WindowedValue<KV<K, OutputT>>> outputCoder = context.getCoder(context.getOutput(transform)); TypeInformation<WindowedValue<KV<K, OutputT>>> outputTypeInfo = context.getTypeInfo(context.getOutput(transform)); - List<PCollectionView<?>> sideInputs = transform.getSideInputs(); + List<PCollectionView<?>> sideInputs; + try { + sideInputs = CombineTranslation.getSideInputs(context.getCurrentTransform()); + } catch (IOException e) { + throw new RuntimeException(e); + } if (sideInputs.isEmpty()) { @@ -931,18 +1004,18 @@ public void translateNode( private static class GBKIntoKeyedWorkItemsTranslator<K, InputT> extends FlinkStreamingPipelineTranslator.StreamTransformTranslator< - SplittableParDoViaKeyedWorkItems.GBKIntoKeyedWorkItems<K, InputT>> { + PTransform<PCollection<KV<K, InputT>>, PCollection<KeyedWorkItem<K, InputT>>>> { @Override boolean canTranslate( - SplittableParDoViaKeyedWorkItems.GBKIntoKeyedWorkItems<K, InputT> transform, + PTransform<PCollection<KV<K, InputT>>, PCollection<KeyedWorkItem<K, InputT>>> transform, FlinkStreamingTranslationContext context) { return true; } @Override public void translateNode( - SplittableParDoViaKeyedWorkItems.GBKIntoKeyedWorkItems<K, InputT> transform, + PTransform<PCollection<KV<K, InputT>>, PCollection<KeyedWorkItem<K, InputT>>> transform, FlinkStreamingTranslationContext context) { PCollection<KV<K, InputT>> input = context.getInput(transform); @@ -982,11 +1055,11 @@ public void translateNode( private static class FlattenPCollectionTranslator<T> extends FlinkStreamingPipelineTranslator.StreamTransformTranslator< - Flatten.PCollections<T>> { + PTransform<PCollection<T>, PCollection<T>>> { @Override public void translateNode( - Flatten.PCollections<T> transform, + PTransform<PCollection<T>, PCollection<T>> transform, FlinkStreamingTranslationContext context) { Map<TupleTag<?>, PValue> allInputs = context.getInputs(transform); diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkTransformOverrides.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkTransformOverrides.java index 1dc8de91013..9baef8f492a 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkTransformOverrides.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkTransformOverrides.java @@ -21,10 +21,10 @@ import java.util.List; import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems; import org.apache.beam.runners.core.construction.PTransformMatchers; +import org.apache.beam.runners.core.construction.PTransformTranslation; import org.apache.beam.runners.core.construction.SplittableParDo; import org.apache.beam.sdk.runners.PTransformOverride; import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.View; /** * {@link PTransform} overrides for Flink runner. @@ -35,15 +35,16 @@ return ImmutableList.<PTransformOverride>builder() .add( PTransformOverride.of( - PTransformMatchers.splittableParDoMulti(), + PTransformMatchers.splittableParDo(), new FlinkStreamingPipelineTranslator.SplittableParDoOverrideFactory())) .add( PTransformOverride.of( - PTransformMatchers.classEqualTo(SplittableParDo.ProcessKeyedElements.class), + PTransformMatchers.urnEqualTo( + SplittableParDo.SPLITTABLE_PROCESS_KEYED_ELEMENTS_URN), new SplittableParDoViaKeyedWorkItems.OverrideFactory())) .add( PTransformOverride.of( - PTransformMatchers.classEqualTo(View.CreatePCollectionView.class), + PTransformMatchers.urnEqualTo(PTransformTranslation.CREATE_VIEW_TRANSFORM_URN), new CreateStreamingFlinkView.Factory())) .build(); } else { ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > FlinkRunner: translate using SDK-agnostic means > ----------------------------------------------- > > Key: BEAM-2795 > URL: https://issues.apache.org/jira/browse/BEAM-2795 > Project: Beam > Issue Type: Sub-task > Components: runner-flink > Reporter: Ben Sidhom > Assignee: Kenneth Knowles > Priority: Major > Labels: portability > -- This message was sent by Atlassian JIRA (v7.6.3#76005)