This is an automated email from the ASF dual-hosted git repository. robertwb pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 3c27372 Remove unused Flink assignWindows translator and workarounds. new 7e186f0 Merge pull request #8058 Remove unused Flink assignWindows translator and workarounds. 3c27372 is described below commit 3c273724e432bfc4e55326cec61e2939fa4c16b5 Author: Robert Bradshaw <rober...@google.com> AuthorDate: Thu Mar 14 14:40:26 2019 +0100 Remove unused Flink assignWindows translator and workarounds. --- .../core/construction/graph/PipelineTrimmer.java | 11 +---- .../FlinkBatchPortablePipelineTranslator.java | 51 ---------------------- .../FlinkStreamingPortablePipelineTranslator.java | 40 ----------------- 3 files changed, 1 insertion(+), 101 deletions(-) diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/PipelineTrimmer.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/PipelineTrimmer.java index f3edaad..894a5d1 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/PipelineTrimmer.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/PipelineTrimmer.java @@ -20,9 +20,6 @@ package org.apache.beam.runners.core.construction.graph; import java.util.Set; import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline; -import org.apache.beam.runners.core.construction.PTransformTranslation; -import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableSet; -import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Sets; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,13 +37,7 @@ public class PipelineTrimmer { * @return the trimmed pipeline */ public static Pipeline trim(Pipeline pipeline, Set<String> knownUrns) { - RunnerApi.Pipeline trimmedPipeline = - makeKnownUrnsPrimitives( - pipeline, - // The fuser should fuse AssignWindows into the graph, so we don't handle it here. - Sets.difference( - knownUrns, ImmutableSet.of(PTransformTranslation.ASSIGN_WINDOWS_TRANSFORM_URN))); - return trimmedPipeline; + return makeKnownUrnsPrimitives(pipeline, knownUrns); } private static RunnerApi.Pipeline makeKnownUrnsPrimitives( diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java index 496d737..745a784 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java @@ -48,7 +48,6 @@ import org.apache.beam.runners.core.construction.graph.PipelineNode; import org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode; import org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode; import org.apache.beam.runners.core.construction.graph.QueryablePipeline; -import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows; import org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageContext; import org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction; import org.apache.beam.runners.flink.translation.functions.FlinkExecutableStagePruningFunction; @@ -71,7 +70,6 @@ import org.apache.beam.sdk.transforms.join.RawUnionValue; import org.apache.beam.sdk.transforms.join.UnionCoder; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; -import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder; import org.apache.beam.sdk.values.KV; @@ -147,9 +145,6 @@ public class FlinkBatchPortablePipelineTranslator PTransformTranslation.IMPULSE_TRANSFORM_URN, FlinkBatchPortablePipelineTranslator::translateImpulse); translatorMap.put( - PTransformTranslation.ASSIGN_WINDOWS_TRANSFORM_URN, - FlinkBatchPortablePipelineTranslator::translateAssignWindows); - translatorMap.put( ExecutableStage.URN, FlinkBatchPortablePipelineTranslator::translateExecutableStage); translatorMap.put( PTransformTranslation.RESHUFFLE_URN, @@ -286,52 +281,6 @@ public class FlinkBatchPortablePipelineTranslator inputDataSet.rebalance()); } - private static <T> void translateAssignWindows( - PTransformNode transform, RunnerApi.Pipeline pipeline, BatchTranslationContext context) { - RunnerApi.Components components = pipeline.getComponents(); - RunnerApi.WindowIntoPayload payload; - try { - payload = - RunnerApi.WindowIntoPayload.parseFrom(transform.getTransform().getSpec().getPayload()); - } catch (InvalidProtocolBufferException e) { - throw new IllegalArgumentException(e); - } - // TODO: https://issues.apache.org/jira/browse/BEAM-4296 - // This only works for well known window fns, we should defer this execution to the SDK - // if the WindowFn can't be parsed or just defer it all the time. - WindowFn<T, ? extends BoundedWindow> windowFn = - (WindowFn<T, ? extends BoundedWindow>) - WindowingStrategyTranslation.windowFnFromProto(payload.getWindowFn()); - - String inputCollectionId = - Iterables.getOnlyElement(transform.getTransform().getInputsMap().values()); - String outputCollectionId = - Iterables.getOnlyElement(transform.getTransform().getOutputsMap().values()); - PCollectionNode collectionNode = - PipelineNode.pCollection( - outputCollectionId, components.getPcollectionsOrThrow(outputCollectionId)); - Coder<WindowedValue<T>> outputCoder; - try { - outputCoder = WireCoders.instantiateRunnerWireCoder(collectionNode, components); - } catch (IOException e) { - throw new RuntimeException(e); - } - TypeInformation<WindowedValue<T>> resultTypeInfo = new CoderTypeInformation<>(outputCoder); - - DataSet<WindowedValue<T>> inputDataSet = context.getDataSetOrThrow(inputCollectionId); - - FlinkAssignWindows<T, ? extends BoundedWindow> assignWindowsFunction = - new FlinkAssignWindows<>(windowFn); - - DataSet<WindowedValue<T>> resultDataSet = - inputDataSet - .flatMap(assignWindowsFunction) - .name(transform.getTransform().getUniqueName()) - .returns(resultTypeInfo); - - context.addDataSet(outputCollectionId, resultDataSet); - } - private static <InputT> void translateExecutableStage( PTransformNode transform, RunnerApi.Pipeline pipeline, BatchTranslationContext context) { // TODO: Fail on splittable DoFns. diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java index a2c4f92..e6018d5 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java @@ -45,7 +45,6 @@ import org.apache.beam.runners.core.construction.WindowingStrategyTranslation; import org.apache.beam.runners.core.construction.graph.ExecutableStage; import org.apache.beam.runners.core.construction.graph.PipelineNode; import org.apache.beam.runners.core.construction.graph.QueryablePipeline; -import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows; import org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageContext; import org.apache.beam.runners.flink.translation.functions.ImpulseSourceFunction; import org.apache.beam.runners.flink.translation.types.CoderTypeInformation; @@ -69,7 +68,6 @@ import org.apache.beam.sdk.transforms.join.RawUnionValue; import org.apache.beam.sdk.transforms.join.UnionCoder; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; -import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder; import org.apache.beam.sdk.values.KV; @@ -193,8 +191,6 @@ public class FlinkStreamingPortablePipelineTranslator translatorMap.put(PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN, this::translateGroupByKey); translatorMap.put(PTransformTranslation.IMPULSE_TRANSFORM_URN, this::translateImpulse); translatorMap.put(STREAMING_IMPULSE_TRANSFORM_URN, this::translateStreamingImpulse); - translatorMap.put( - PTransformTranslation.ASSIGN_WINDOWS_TRANSFORM_URN, this::translateAssignWindows); translatorMap.put(ExecutableStage.URN, this::translateExecutableStage); translatorMap.put(PTransformTranslation.RESHUFFLE_URN, this::translateReshuffle); @@ -475,42 +471,6 @@ public class FlinkStreamingPortablePipelineTranslator context.addDataStream(Iterables.getOnlyElement(pTransform.getOutputsMap().values()), source); } - private <T> void translateAssignWindows( - String id, RunnerApi.Pipeline pipeline, StreamingTranslationContext context) { - RunnerApi.Components components = pipeline.getComponents(); - RunnerApi.PTransform transform = components.getTransformsOrThrow(id); - RunnerApi.WindowIntoPayload payload; - try { - payload = RunnerApi.WindowIntoPayload.parseFrom(transform.getSpec().getPayload()); - } catch (InvalidProtocolBufferException e) { - throw new IllegalArgumentException(e); - } - // TODO: https://issues.apache.org/jira/browse/BEAM-4296 - // This only works for well known window fns, we should defer this execution to the SDK - // if the WindowFn can't be parsed or just defer it all the time. - WindowFn<T, ? extends BoundedWindow> windowFn = - (WindowFn<T, ? extends BoundedWindow>) - WindowingStrategyTranslation.windowFnFromProto(payload.getWindowFn()); - - String inputCollectionId = Iterables.getOnlyElement(transform.getInputsMap().values()); - String outputCollectionId = Iterables.getOnlyElement(transform.getOutputsMap().values()); - Coder<WindowedValue<T>> outputCoder = instantiateCoder(outputCollectionId, components); - TypeInformation<WindowedValue<T>> resultTypeInfo = new CoderTypeInformation<>(outputCoder); - - DataStream<WindowedValue<T>> inputDataStream = context.getDataStreamOrThrow(inputCollectionId); - - FlinkAssignWindows<T, ? extends BoundedWindow> assignWindowsFunction = - new FlinkAssignWindows<>(windowFn); - - DataStream<WindowedValue<T>> resultDataStream = - inputDataStream - .flatMap(assignWindowsFunction) - .name(transform.getUniqueName()) - .returns(resultTypeInfo); - - context.addDataStream(outputCollectionId, resultDataStream); - } - private <InputT, OutputT> void translateExecutableStage( String id, RunnerApi.Pipeline pipeline, StreamingTranslationContext context) { // TODO: Fail on splittable DoFns.