This is an automated email from the ASF dual-hosted git repository. lcwik pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 875f3e1 Translate Flink batch nodes by URN 875f3e1 is described below commit 875f3e1b0c943b56bda99e610ede872a07573be5 Author: Ben Sidhom <sid...@google.com> AuthorDate: Mon Dec 18 11:26:36 2017 -0800 Translate Flink batch nodes by URN Before this change, the translator downcasts pipeline transforms. This cannot be done with jobs that have been submitted portably through the Job API. This change uses URNs (available on rehydrated pipelines) for translation. --- .../flink/FlinkBatchTransformTranslators.java | 27 ++++++++++++++-------- 1 file changed, 17 insertions(+), 10 deletions(-) diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java index 3689698..a4265f5 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java @@ -28,6 +28,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import javax.annotation.Nullable; +import org.apache.beam.runners.core.construction.PTransformTranslation; import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows; import org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction; import org.apache.beam.runners.flink.translation.functions.FlinkMergingNonShuffleReduceFunction; @@ -96,29 +98,34 @@ class FlinkBatchTransformTranslators { @SuppressWarnings("rawtypes") private static final Map< - Class<? extends PTransform>, + String, FlinkBatchPipelineTranslator.BatchTransformTranslator> TRANSLATORS = new HashMap<>(); static { - TRANSLATORS.put(View.CreatePCollectionView.class, new CreatePCollectionViewTranslatorBatch()); + TRANSLATORS.put(PTransformTranslation.CREATE_VIEW_TRANSFORM_URN, + new CreatePCollectionViewTranslatorBatch()); - TRANSLATORS.put(Combine.PerKey.class, new CombinePerKeyTranslatorBatch()); - TRANSLATORS.put(GroupByKey.class, new GroupByKeyTranslatorBatch()); - TRANSLATORS.put(Reshuffle.class, new ReshuffleTranslatorBatch()); + TRANSLATORS.put(PTransformTranslation.COMBINE_TRANSFORM_URN, + new CombinePerKeyTranslatorBatch()); + TRANSLATORS.put(PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN, + new GroupByKeyTranslatorBatch()); + TRANSLATORS.put(PTransformTranslation.RESHUFFLE_URN, new ReshuffleTranslatorBatch()); - TRANSLATORS.put(Flatten.PCollections.class, new FlattenPCollectionTranslatorBatch()); + TRANSLATORS.put(PTransformTranslation.FLATTEN_TRANSFORM_URN, + new FlattenPCollectionTranslatorBatch()); - TRANSLATORS.put(Window.Assign.class, new WindowAssignTranslatorBatch()); + TRANSLATORS.put(PTransformTranslation.WINDOW_TRANSFORM_URN, new WindowAssignTranslatorBatch()); - TRANSLATORS.put(ParDo.MultiOutput.class, new ParDoTranslatorBatch()); + TRANSLATORS.put(PTransformTranslation.PAR_DO_TRANSFORM_URN, new ParDoTranslatorBatch()); - TRANSLATORS.put(Read.Bounded.class, new ReadSourceTranslatorBatch()); + TRANSLATORS.put(PTransformTranslation.READ_TRANSFORM_URN, new ReadSourceTranslatorBatch()); } static FlinkBatchPipelineTranslator.BatchTransformTranslator<?> getTranslator( PTransform<?, ?> transform) { - return TRANSLATORS.get(transform.getClass()); + @Nullable String urn = PTransformTranslation.urnForTransformOrNull(transform); + return urn == null ? null : TRANSLATORS.get(urn); } private static class ReadSourceTranslatorBatch<T> -- To stop receiving notification emails like this one, please contact ['"commits@beam.apache.org" <commits@beam.apache.org>'].