This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 3c27372  Remove unused Flink assignWindows translator and workarounds.
     new 7e186f0  Merge pull request #8058 Remove unused Flink assignWindows 
translator and workarounds.
3c27372 is described below

commit 3c273724e432bfc4e55326cec61e2939fa4c16b5
Author: Robert Bradshaw <rober...@google.com>
AuthorDate: Thu Mar 14 14:40:26 2019 +0100

    Remove unused Flink assignWindows translator and workarounds.
---
 .../core/construction/graph/PipelineTrimmer.java   | 11 +----
 .../FlinkBatchPortablePipelineTranslator.java      | 51 ----------------------
 .../FlinkStreamingPortablePipelineTranslator.java  | 40 -----------------
 3 files changed, 1 insertion(+), 101 deletions(-)

diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/PipelineTrimmer.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/PipelineTrimmer.java
index f3edaad..894a5d1 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/PipelineTrimmer.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/PipelineTrimmer.java
@@ -20,9 +20,6 @@ package org.apache.beam.runners.core.construction.graph;
 import java.util.Set;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline;
-import org.apache.beam.runners.core.construction.PTransformTranslation;
-import 
org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableSet;
-import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Sets;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -40,13 +37,7 @@ public class PipelineTrimmer {
    * @return the trimmed pipeline
    */
   public static Pipeline trim(Pipeline pipeline, Set<String> knownUrns) {
-    RunnerApi.Pipeline trimmedPipeline =
-        makeKnownUrnsPrimitives(
-            pipeline,
-            // The fuser should fuse AssignWindows into the graph, so we don't 
handle it here.
-            Sets.difference(
-                knownUrns, 
ImmutableSet.of(PTransformTranslation.ASSIGN_WINDOWS_TRANSFORM_URN)));
-    return trimmedPipeline;
+    return makeKnownUrnsPrimitives(pipeline, knownUrns);
   }
 
   private static RunnerApi.Pipeline makeKnownUrnsPrimitives(
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
index 496d737..745a784 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
@@ -48,7 +48,6 @@ import 
org.apache.beam.runners.core.construction.graph.PipelineNode;
 import 
org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode;
 import 
org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
 import org.apache.beam.runners.core.construction.graph.QueryablePipeline;
-import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows;
 import 
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageContext;
 import 
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction;
 import 
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStagePruningFunction;
@@ -71,7 +70,6 @@ import org.apache.beam.sdk.transforms.join.RawUnionValue;
 import org.apache.beam.sdk.transforms.join.UnionCoder;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
 import org.apache.beam.sdk.values.KV;
@@ -147,9 +145,6 @@ public class FlinkBatchPortablePipelineTranslator
         PTransformTranslation.IMPULSE_TRANSFORM_URN,
         FlinkBatchPortablePipelineTranslator::translateImpulse);
     translatorMap.put(
-        PTransformTranslation.ASSIGN_WINDOWS_TRANSFORM_URN,
-        FlinkBatchPortablePipelineTranslator::translateAssignWindows);
-    translatorMap.put(
         ExecutableStage.URN, 
FlinkBatchPortablePipelineTranslator::translateExecutableStage);
     translatorMap.put(
         PTransformTranslation.RESHUFFLE_URN,
@@ -286,52 +281,6 @@ public class FlinkBatchPortablePipelineTranslator
         inputDataSet.rebalance());
   }
 
-  private static <T> void translateAssignWindows(
-      PTransformNode transform, RunnerApi.Pipeline pipeline, 
BatchTranslationContext context) {
-    RunnerApi.Components components = pipeline.getComponents();
-    RunnerApi.WindowIntoPayload payload;
-    try {
-      payload =
-          
RunnerApi.WindowIntoPayload.parseFrom(transform.getTransform().getSpec().getPayload());
-    } catch (InvalidProtocolBufferException e) {
-      throw new IllegalArgumentException(e);
-    }
-    // TODO: https://issues.apache.org/jira/browse/BEAM-4296
-    // This only works for well known window fns, we should defer this 
execution to the SDK
-    // if the WindowFn can't be parsed or just defer it all the time.
-    WindowFn<T, ? extends BoundedWindow> windowFn =
-        (WindowFn<T, ? extends BoundedWindow>)
-            
WindowingStrategyTranslation.windowFnFromProto(payload.getWindowFn());
-
-    String inputCollectionId =
-        
Iterables.getOnlyElement(transform.getTransform().getInputsMap().values());
-    String outputCollectionId =
-        
Iterables.getOnlyElement(transform.getTransform().getOutputsMap().values());
-    PCollectionNode collectionNode =
-        PipelineNode.pCollection(
-            outputCollectionId, 
components.getPcollectionsOrThrow(outputCollectionId));
-    Coder<WindowedValue<T>> outputCoder;
-    try {
-      outputCoder = WireCoders.instantiateRunnerWireCoder(collectionNode, 
components);
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-    TypeInformation<WindowedValue<T>> resultTypeInfo = new 
CoderTypeInformation<>(outputCoder);
-
-    DataSet<WindowedValue<T>> inputDataSet = 
context.getDataSetOrThrow(inputCollectionId);
-
-    FlinkAssignWindows<T, ? extends BoundedWindow> assignWindowsFunction =
-        new FlinkAssignWindows<>(windowFn);
-
-    DataSet<WindowedValue<T>> resultDataSet =
-        inputDataSet
-            .flatMap(assignWindowsFunction)
-            .name(transform.getTransform().getUniqueName())
-            .returns(resultTypeInfo);
-
-    context.addDataSet(outputCollectionId, resultDataSet);
-  }
-
   private static <InputT> void translateExecutableStage(
       PTransformNode transform, RunnerApi.Pipeline pipeline, 
BatchTranslationContext context) {
     // TODO: Fail on splittable DoFns.
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
index a2c4f92..e6018d5 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
@@ -45,7 +45,6 @@ import 
org.apache.beam.runners.core.construction.WindowingStrategyTranslation;
 import org.apache.beam.runners.core.construction.graph.ExecutableStage;
 import org.apache.beam.runners.core.construction.graph.PipelineNode;
 import org.apache.beam.runners.core.construction.graph.QueryablePipeline;
-import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows;
 import 
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageContext;
 import 
org.apache.beam.runners.flink.translation.functions.ImpulseSourceFunction;
 import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
@@ -69,7 +68,6 @@ import org.apache.beam.sdk.transforms.join.RawUnionValue;
 import org.apache.beam.sdk.transforms.join.UnionCoder;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
 import org.apache.beam.sdk.values.KV;
@@ -193,8 +191,6 @@ public class FlinkStreamingPortablePipelineTranslator
     translatorMap.put(PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN, 
this::translateGroupByKey);
     translatorMap.put(PTransformTranslation.IMPULSE_TRANSFORM_URN, 
this::translateImpulse);
     translatorMap.put(STREAMING_IMPULSE_TRANSFORM_URN, 
this::translateStreamingImpulse);
-    translatorMap.put(
-        PTransformTranslation.ASSIGN_WINDOWS_TRANSFORM_URN, 
this::translateAssignWindows);
     translatorMap.put(ExecutableStage.URN, this::translateExecutableStage);
     translatorMap.put(PTransformTranslation.RESHUFFLE_URN, 
this::translateReshuffle);
 
@@ -475,42 +471,6 @@ public class FlinkStreamingPortablePipelineTranslator
     
context.addDataStream(Iterables.getOnlyElement(pTransform.getOutputsMap().values()),
 source);
   }
 
-  private <T> void translateAssignWindows(
-      String id, RunnerApi.Pipeline pipeline, StreamingTranslationContext 
context) {
-    RunnerApi.Components components = pipeline.getComponents();
-    RunnerApi.PTransform transform = components.getTransformsOrThrow(id);
-    RunnerApi.WindowIntoPayload payload;
-    try {
-      payload = 
RunnerApi.WindowIntoPayload.parseFrom(transform.getSpec().getPayload());
-    } catch (InvalidProtocolBufferException e) {
-      throw new IllegalArgumentException(e);
-    }
-    // TODO: https://issues.apache.org/jira/browse/BEAM-4296
-    // This only works for well known window fns, we should defer this 
execution to the SDK
-    // if the WindowFn can't be parsed or just defer it all the time.
-    WindowFn<T, ? extends BoundedWindow> windowFn =
-        (WindowFn<T, ? extends BoundedWindow>)
-            
WindowingStrategyTranslation.windowFnFromProto(payload.getWindowFn());
-
-    String inputCollectionId = 
Iterables.getOnlyElement(transform.getInputsMap().values());
-    String outputCollectionId = 
Iterables.getOnlyElement(transform.getOutputsMap().values());
-    Coder<WindowedValue<T>> outputCoder = instantiateCoder(outputCollectionId, 
components);
-    TypeInformation<WindowedValue<T>> resultTypeInfo = new 
CoderTypeInformation<>(outputCoder);
-
-    DataStream<WindowedValue<T>> inputDataStream = 
context.getDataStreamOrThrow(inputCollectionId);
-
-    FlinkAssignWindows<T, ? extends BoundedWindow> assignWindowsFunction =
-        new FlinkAssignWindows<>(windowFn);
-
-    DataStream<WindowedValue<T>> resultDataStream =
-        inputDataStream
-            .flatMap(assignWindowsFunction)
-            .name(transform.getUniqueName())
-            .returns(resultTypeInfo);
-
-    context.addDataStream(outputCollectionId, resultDataStream);
-  }
-
   private <InputT, OutputT> void translateExecutableStage(
       String id, RunnerApi.Pipeline pipeline, StreamingTranslationContext 
context) {
     // TODO: Fail on splittable DoFns.

Reply via email to