Repository: beam
Updated Branches:
  refs/heads/master 75728a40c -> 3178f07b9


[BEAM-1772] Support merging WindowFn other than IntervalWindow on Flink Runner


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d4a9f60d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d4a9f60d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d4a9f60d

Branch: refs/heads/master
Commit: d4a9f60df15f22f30869a1e19493b32de2506049
Parents: 75728a4
Author: JingsongLi <lzljs3620...@aliyun.com>
Authored: Mon Mar 27 02:09:32 2017 +0800
Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Committed: Tue Apr 25 15:58:56 2017 +0200

----------------------------------------------------------------------
 .../flink/FlinkBatchTransformTranslators.java   |  78 ++-----
 .../functions/AbstractFlinkCombineRunner.java   | 182 +++++++++++++++++
 .../FlinkMergingNonShuffleReduceFunction.java   | 165 ++-------------
 .../FlinkMergingPartialReduceFunction.java      | 201 -------------------
 .../functions/FlinkMergingReduceFunction.java   | 199 ------------------
 .../functions/FlinkPartialReduceFunction.java   | 114 ++---------
 .../functions/FlinkReduceFunction.java          | 110 ++--------
 .../functions/HashingFlinkCombineRunner.java    | 173 ++++++++++++++++
 .../functions/SortingFlinkCombineRunner.java    | 185 +++++++++++++++++
 9 files changed, 611 insertions(+), 796 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/d4a9f60d/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
index cb33fc1..99de5be 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
@@ -31,8 +31,6 @@ import java.util.Map.Entry;
 import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows;
 import org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction;
 import 
org.apache.beam.runners.flink.translation.functions.FlinkMergingNonShuffleReduceFunction;
-import 
org.apache.beam.runners.flink.translation.functions.FlinkMergingPartialReduceFunction;
-import 
org.apache.beam.runners.flink.translation.functions.FlinkMergingReduceFunction;
 import 
org.apache.beam.runners.flink.translation.functions.FlinkMultiOutputPruningFunction;
 import 
org.apache.beam.runners.flink.translation.functions.FlinkPartialReduceFunction;
 import org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction;
@@ -62,7 +60,6 @@ import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.Reshuffle;
@@ -75,6 +72,7 @@ import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.RichGroupReduceFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.operators.DataSource;
@@ -221,48 +219,21 @@ class FlinkBatchTransformTranslators {
       Grouping<WindowedValue<KV<K, InputT>>> inputGrouping =
           inputDataSet.groupBy(new KvKeySelector<InputT, 
K>(inputCoder.getKeyCoder()));
 
-      FlinkPartialReduceFunction<K, InputT, List<InputT>, ?> 
partialReduceFunction;
-      FlinkReduceFunction<K, List<InputT>, List<InputT>, ?> reduceFunction;
-
-      if (windowingStrategy.getWindowFn().isNonMerging()) {
-        @SuppressWarnings("unchecked")
-        WindowingStrategy<?, BoundedWindow> boundedStrategy =
-            (WindowingStrategy<?, BoundedWindow>) windowingStrategy;
-
-        partialReduceFunction = new FlinkPartialReduceFunction<>(
-            combineFn,
-            boundedStrategy,
-            Collections.<PCollectionView<?>, WindowingStrategy<?, 
?>>emptyMap(),
-            context.getPipelineOptions());
-
-        reduceFunction = new FlinkReduceFunction<>(
-            combineFn,
-            boundedStrategy,
-            Collections.<PCollectionView<?>, WindowingStrategy<?, 
?>>emptyMap(),
-            context.getPipelineOptions());
+      @SuppressWarnings("unchecked")
+      WindowingStrategy<Object, BoundedWindow> boundedStrategy =
+          (WindowingStrategy<Object, BoundedWindow>) windowingStrategy;
 
-      } else {
-        if 
(!windowingStrategy.getWindowFn().windowCoder().equals(IntervalWindow.getCoder()))
 {
-          throw new UnsupportedOperationException(
-              "Merging WindowFn with windows other than IntervalWindow are not 
supported.");
-        }
+      FlinkPartialReduceFunction<K, InputT, List<InputT>, ?> 
partialReduceFunction =
+          new FlinkPartialReduceFunction<>(
+              combineFn, boundedStrategy,
+              Collections.<PCollectionView<?>, WindowingStrategy<?, 
?>>emptyMap(),
+              context.getPipelineOptions());
 
-        @SuppressWarnings("unchecked")
-        WindowingStrategy<?, IntervalWindow> intervalStrategy =
-            (WindowingStrategy<?, IntervalWindow>) windowingStrategy;
-
-        partialReduceFunction = new FlinkMergingPartialReduceFunction<>(
-            combineFn,
-            intervalStrategy,
-            Collections.<PCollectionView<?>, WindowingStrategy<?, 
?>>emptyMap(),
-            context.getPipelineOptions());
-
-        reduceFunction = new FlinkMergingReduceFunction<>(
-            combineFn,
-            intervalStrategy,
-            Collections.<PCollectionView<?>, WindowingStrategy<?, 
?>>emptyMap(),
-            context.getPipelineOptions());
-      }
+      FlinkReduceFunction<K, List<InputT>, List<InputT>, ?> reduceFunction =
+          new FlinkReduceFunction<>(
+              combineFn, boundedStrategy,
+              Collections.<PCollectionView<?>, WindowingStrategy<?, 
?>>emptyMap(),
+              context.getPipelineOptions());
 
       // Partially GroupReduce the values into the intermediate format AccumT 
(combine)
       GroupCombineOperator<
@@ -402,9 +373,10 @@ class FlinkBatchTransformTranslators {
         sideInputStrategies.put(sideInput, 
sideInput.getWindowingStrategyInternal());
       }
 
+      WindowingStrategy<Object, BoundedWindow> boundedStrategy =
+          (WindowingStrategy<Object, BoundedWindow>) windowingStrategy;
+
       if (windowingStrategy.getWindowFn().isNonMerging()) {
-        WindowingStrategy<?, BoundedWindow> boundedStrategy =
-            (WindowingStrategy<?, BoundedWindow>) windowingStrategy;
 
         FlinkPartialReduceFunction<K, InputT, AccumT, ?> partialReduceFunction 
=
             new FlinkPartialReduceFunction<>(
@@ -449,23 +421,13 @@ class FlinkBatchTransformTranslators {
         context.setOutputDataSet(context.getOutput(transform), outputDataSet);
 
       } else {
-        if 
(!windowingStrategy.getWindowFn().windowCoder().equals(IntervalWindow.getCoder()))
 {
-          throw new UnsupportedOperationException(
-              "Merging WindowFn with windows other than IntervalWindow are not 
supported.");
-        }
 
         // for merging windows we can't to a pre-shuffle combine step since
         // elements would not be in their correct windows for side-input access
 
-        WindowingStrategy<?, IntervalWindow> intervalStrategy =
-            (WindowingStrategy<?, IntervalWindow>) windowingStrategy;
-
-        FlinkMergingNonShuffleReduceFunction<K, InputT, AccumT, OutputT, ?> 
reduceFunction =
-            new FlinkMergingNonShuffleReduceFunction<>(
-                combineFn,
-                intervalStrategy,
-                sideInputStrategies,
-                context.getPipelineOptions());
+        RichGroupReduceFunction<WindowedValue<KV<K, InputT>>, 
WindowedValue<KV<K, OutputT>>>
+            reduceFunction = new FlinkMergingNonShuffleReduceFunction<>(
+                combineFn, boundedStrategy, sideInputStrategies, 
context.getPipelineOptions());
 
         TypeInformation<WindowedValue<KV<K, OutputT>>> reduceTypeInfo =
             context.getTypeInfo(context.getOutput(transform));

http://git-wip-us.apache.org/repos/asf/beam/blob/d4a9f60d/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/AbstractFlinkCombineRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/AbstractFlinkCombineRunner.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/AbstractFlinkCombineRunner.java
new file mode 100644
index 0000000..83ff70d
--- /dev/null
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/AbstractFlinkCombineRunner.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.functions;
+
+import com.google.common.collect.ImmutableList;
+import java.util.Collection;
+import org.apache.beam.runners.core.PerKeyCombineFnRunner;
+import org.apache.beam.runners.core.PerKeyCombineFnRunners;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.CombineFnBase;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.SideInputReader;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.KV;
+import org.apache.flink.util.Collector;
+
+/**
+ * Abstract base for runners that execute a {@link 
org.apache.beam.sdk.transforms.Combine.PerKey}.
+ * This unifies processing of merging/non-merging and partial/final combines.
+ *
+ * <p>The input to {@link #combine(
+ * FlinkCombiner, WindowingStrategy, SideInputReader, PipelineOptions, 
Iterable, Collector)} are
+ * elements of the same key but * for different windows.
+ */
+public abstract class AbstractFlinkCombineRunner<
+    K, InputT, AccumT, OutputT, W extends BoundedWindow> {
+
+  /**
+   * Consumes {@link WindowedValue WindowedValues} and produces combined 
output to the given output.
+   */
+  public abstract void combine(
+      FlinkCombiner<K, InputT, AccumT, OutputT> flinkCombiner,
+      WindowingStrategy<Object, W> windowingStrategy,
+      SideInputReader sideInputReader,
+      PipelineOptions options,
+      Iterable<WindowedValue<KV<K, InputT>>> elements,
+      Collector<WindowedValue<KV<K, OutputT>>> out) throws Exception;
+
+  /**
+   * Adapter interface that allows using a {@link 
CombineFnBase.PerKeyCombineFn} to either produce
+   * the {@code AccumT} as output or to combine several accumulators into an 
{@code OutputT}.
+   * The former would be used for a partial combine while the latter is used 
for the final merging
+   * of accumulators.
+   */
+  public interface FlinkCombiner<K, InputT, AccumT, OutputT>{
+
+    AccumT firstInput(K key, InputT value, PipelineOptions options,
+                      SideInputReader sideInputReader, Collection<? extends 
BoundedWindow> windows);
+
+    AccumT addInput(K key, AccumT accumulator, InputT value, PipelineOptions 
options,
+                    SideInputReader sideInputReader, Collection<? extends 
BoundedWindow> windows);
+
+    OutputT extractOutput(K key, AccumT accumulator, PipelineOptions options,
+                          SideInputReader sideInputReader,
+                          Collection<? extends BoundedWindow> windows);
+  }
+
+  /**
+   * A straight wrapper of {@link CombineFnBase.PerKeyCombineFn} that takes in 
{@code InputT}
+   * and emits {@code OutputT}.
+   */
+  public static class CompleteFlinkCombiner<K, InputT, AccumT, OutputT> 
implements
+      FlinkCombiner<K, InputT, AccumT, OutputT> {
+
+    private final PerKeyCombineFnRunner<K, InputT, AccumT, OutputT> 
combineFnRunner;
+
+    public CompleteFlinkCombiner(
+        CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, OutputT> combineFn) {
+      combineFnRunner = PerKeyCombineFnRunners.create(combineFn);
+    }
+
+    @Override
+    public AccumT firstInput(
+        K key, InputT value, PipelineOptions options, SideInputReader 
sideInputReader,
+        Collection<? extends BoundedWindow> windows) {
+      AccumT accumulator =
+          combineFnRunner.createAccumulator(key, options, sideInputReader, 
windows);
+      return combineFnRunner.addInput(key, accumulator, value, options, 
sideInputReader, windows);
+    }
+
+    @Override
+    public AccumT addInput(
+        K key, AccumT accumulator, InputT value, PipelineOptions options,
+        SideInputReader sideInputReader, Collection<? extends BoundedWindow> 
windows) {
+      return combineFnRunner.addInput(key, accumulator, value, options, 
sideInputReader, windows);
+    }
+
+    @Override
+    public OutputT extractOutput(
+        K key, AccumT accumulator, PipelineOptions options, SideInputReader 
sideInputReader,
+        Collection<? extends BoundedWindow> windows) {
+      return combineFnRunner.extractOutput(key, accumulator, options, 
sideInputReader, windows);
+    }
+  }
+
+  /**
+   * A partial combiner that takes in {@code InputT} and produces {@code 
AccumT}.
+   */
+  public static class PartialFlinkCombiner<K, InputT, AccumT> implements
+      FlinkCombiner<K, InputT, AccumT, AccumT> {
+
+    private final PerKeyCombineFnRunner<K, InputT, AccumT, ?> combineFnRunner;
+
+    public PartialFlinkCombiner(CombineFnBase.PerKeyCombineFn<K, InputT, 
AccumT, ?> combineFn) {
+      combineFnRunner = PerKeyCombineFnRunners.create(combineFn);
+    }
+
+    @Override
+    public AccumT firstInput(
+        K key, InputT value, PipelineOptions options, SideInputReader 
sideInputReader,
+        Collection<? extends BoundedWindow> windows) {
+      AccumT accumulator =
+          combineFnRunner.createAccumulator(key, options, sideInputReader, 
windows);
+      return combineFnRunner.addInput(key, accumulator, value, options, 
sideInputReader, windows);
+    }
+
+    @Override
+    public AccumT addInput(
+        K key, AccumT accumulator, InputT value, PipelineOptions options,
+        SideInputReader sideInputReader, Collection<? extends BoundedWindow> 
windows) {
+      return combineFnRunner.addInput(key, accumulator, value, options, 
sideInputReader, windows);
+    }
+
+    @Override
+    public AccumT extractOutput(
+        K key, AccumT accumulator, PipelineOptions options, SideInputReader 
sideInputReader,
+        Collection<? extends BoundedWindow> windows) {
+      return accumulator;
+    }
+  }
+
+  /**
+   * A final combiner that takes in {@code AccumT} and produces {@code 
OutputT}.
+   */
+  public static class FinalFlinkCombiner<K, AccumT, OutputT> implements
+      FlinkCombiner<K, AccumT, AccumT, OutputT> {
+
+    private final PerKeyCombineFnRunner<K, ?, AccumT, OutputT> combineFnRunner;
+
+    public FinalFlinkCombiner(CombineFnBase.PerKeyCombineFn<K, ?, AccumT, 
OutputT> combineFn) {
+      combineFnRunner = PerKeyCombineFnRunners.create(combineFn);
+    }
+
+    @Override
+    public AccumT firstInput(
+        K key, AccumT value, PipelineOptions options, SideInputReader 
sideInputReader,
+        Collection<? extends BoundedWindow> windows) {
+      return value;
+    }
+
+    @Override
+    public AccumT addInput(
+        K key, AccumT accumulator, AccumT value, PipelineOptions options,
+        SideInputReader sideInputReader, Collection<? extends BoundedWindow> 
windows) {
+      return combineFnRunner.mergeAccumulators(
+          key, ImmutableList.of(accumulator, value), options, sideInputReader, 
windows);
+    }
+
+    @Override
+    public OutputT extractOutput(
+        K key, AccumT accumulator, PipelineOptions options, SideInputReader 
sideInputReader,
+        Collection<? extends BoundedWindow> windows) {
+      return combineFnRunner.extractOutput(key, accumulator, options, 
sideInputReader, windows);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/d4a9f60d/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java
index 26fd0b4..3712598 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java
@@ -17,46 +17,33 @@
  */
 package org.apache.beam.runners.flink.translation.functions;
 
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.List;
 import java.util.Map;
-import org.apache.beam.runners.core.PerKeyCombineFnRunner;
-import org.apache.beam.runners.core.PerKeyCombineFnRunners;
 import 
org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.CombineFnBase;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.flink.api.common.functions.RichGroupReduceFunction;
 import org.apache.flink.util.Collector;
-import org.joda.time.Instant;
 
 /**
- * Special version of {@link FlinkReduceFunction} that supports merging 
windows. This
- * assumes that the windows are {@link IntervalWindow IntervalWindows} and 
exhibits the
- * same behaviour as {@code MergeOverlappingIntervalWindows}.
+ * Special version of {@link FlinkReduceFunction} that supports merging 
windows.
  *
  * <p>This is different from the pair of function for the non-merging windows 
case
  * in that we cannot do combining before the shuffle because elements would not
  * yet be in their correct windows for side-input access.
  */
 public class FlinkMergingNonShuffleReduceFunction<
-    K, InputT, AccumT, OutputT, W extends IntervalWindow>
+    K, InputT, AccumT, OutputT, W extends BoundedWindow>
     extends RichGroupReduceFunction<WindowedValue<KV<K, InputT>>, 
WindowedValue<KV<K, OutputT>>> {
 
   private final CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, OutputT> 
combineFn;
 
-  private final WindowingStrategy<?, W> windowingStrategy;
+  private final WindowingStrategy<Object, W> windowingStrategy;
 
   private final Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs;
 
@@ -64,7 +51,7 @@ public class FlinkMergingNonShuffleReduceFunction<
 
   public FlinkMergingNonShuffleReduceFunction(
       CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, OutputT> keyedCombineFn,
-      WindowingStrategy<?, W> windowingStrategy,
+      WindowingStrategy<Object, W> windowingStrategy,
       Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs,
       PipelineOptions pipelineOptions) {
 
@@ -87,142 +74,22 @@ public class FlinkMergingNonShuffleReduceFunction<
     FlinkSideInputReader sideInputReader =
         new FlinkSideInputReader(sideInputs, getRuntimeContext());
 
-    PerKeyCombineFnRunner<K, InputT, AccumT, OutputT> combineFnRunner =
-        PerKeyCombineFnRunners.create(combineFn);
-
-    @SuppressWarnings("unchecked")
-    OutputTimeFn<? super BoundedWindow> outputTimeFn =
-        (OutputTimeFn<? super BoundedWindow>) 
windowingStrategy.getOutputTimeFn();
-
-    // get all elements so that we can sort them, has to fit into
-    // memory
-    // this seems very unprudent, but correct, for now
-    List<WindowedValue<KV<K, InputT>>> sortedInput = Lists.newArrayList();
-    for (WindowedValue<KV<K, InputT>> inputValue : elements) {
-      for (WindowedValue<KV<K, InputT>> exploded : 
inputValue.explodeWindows()) {
-        sortedInput.add(exploded);
-      }
-    }
-    Collections.sort(sortedInput, new Comparator<WindowedValue<KV<K, 
InputT>>>() {
-      @Override
-      public int compare(
-          WindowedValue<KV<K, InputT>> o1,
-          WindowedValue<KV<K, InputT>> o2) {
-        return Iterables.getOnlyElement(o1.getWindows()).maxTimestamp()
-            
.compareTo(Iterables.getOnlyElement(o2.getWindows()).maxTimestamp());
-      }
-    });
-
-    // merge windows, we have to do it in an extra pre-processing step and
-    // can't do it as we go since the window of early elements would not
-    // be correct when calling the CombineFn
-    mergeWindow(sortedInput);
-
-    // iterate over the elements that are sorted by window timestamp
-    final Iterator<WindowedValue<KV<K, InputT>>> iterator = 
sortedInput.iterator();
-
-    // create accumulator using the first elements key
-    WindowedValue<KV<K, InputT>> currentValue = iterator.next();
-    K key = currentValue.getValue().getKey();
-    IntervalWindow currentWindow =
-        (IntervalWindow) Iterables.getOnlyElement(currentValue.getWindows());
-    InputT firstValue = currentValue.getValue().getValue();
-    AccumT accumulator =
-        combineFnRunner.createAccumulator(key, options, sideInputReader, 
currentValue.getWindows());
-    accumulator = combineFnRunner.addInput(key, accumulator, firstValue,
-        options, sideInputReader, currentValue.getWindows());
-
-    // we use this to keep track of the timestamps assigned by the OutputTimeFn
-    Instant windowTimestamp =
-        outputTimeFn.assignOutputTime(currentValue.getTimestamp(), 
currentWindow);
-
-    while (iterator.hasNext()) {
-      WindowedValue<KV<K, InputT>> nextValue = iterator.next();
-      IntervalWindow nextWindow =
-          (IntervalWindow) Iterables.getOnlyElement(nextValue.getWindows());
-
-      if (currentWindow.equals(nextWindow)) {
-        // continue accumulating and merge windows
-
-        InputT value = nextValue.getValue().getValue();
-        accumulator = combineFnRunner.addInput(key, accumulator, value,
-            options, sideInputReader, currentValue.getWindows());
-
-        windowTimestamp = outputTimeFn.combine(
-            windowTimestamp,
-            outputTimeFn.assignOutputTime(nextValue.getTimestamp(), 
currentWindow));
-
-      } else {
-        // emit the value that we currently have
-        out.collect(
-            WindowedValue.of(
-                KV.of(key, combineFnRunner.extractOutput(key, accumulator,
-                    options, sideInputReader, currentValue.getWindows())),
-                windowTimestamp,
-                currentWindow,
-                PaneInfo.NO_FIRING));
-
-        currentWindow = nextWindow;
-        currentValue = nextValue;
-        InputT value = nextValue.getValue().getValue();
-        accumulator = combineFnRunner.createAccumulator(key,
-            options, sideInputReader, currentValue.getWindows());
-        accumulator = combineFnRunner.addInput(key, accumulator, value,
-            options, sideInputReader, currentValue.getWindows());
-        windowTimestamp = 
outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow);
-      }
+    AbstractFlinkCombineRunner<K, InputT, AccumT, OutputT, W> reduceRunner;
 
+    if 
(windowingStrategy.getWindowFn().windowCoder().equals(IntervalWindow.getCoder()))
 {
+      reduceRunner = new SortingFlinkCombineRunner<>();
+    } else {
+      reduceRunner = new HashingFlinkCombineRunner<>();
     }
 
-    // emit the final accumulator
-    out.collect(
-        WindowedValue.of(
-            KV.of(key, combineFnRunner.extractOutput(key, accumulator,
-                options, sideInputReader, currentValue.getWindows())),
-            windowTimestamp,
-            currentWindow,
-            PaneInfo.NO_FIRING));
-  }
+    reduceRunner.combine(
+        new AbstractFlinkCombineRunner.CompleteFlinkCombiner<>(combineFn),
+        windowingStrategy,
+        sideInputReader,
+        options,
+        elements,
+        out);
 
-  /**
-   * Merge windows. This assumes that the list of elements is sorted by 
window-end timestamp.
-   * This replaces windows in the input list.
-   */
-  private void mergeWindow(List<WindowedValue<KV<K, InputT>>> elements) {
-    int currentStart = 0;
-    IntervalWindow currentWindow =
-        (IntervalWindow) 
Iterables.getOnlyElement(elements.get(0).getWindows());
-
-    for (int i = 1; i < elements.size(); i++) {
-      WindowedValue<KV<K, InputT>> nextValue = elements.get(i);
-      IntervalWindow nextWindow =
-          (IntervalWindow) Iterables.getOnlyElement(nextValue.getWindows());
-      if (currentWindow.intersects(nextWindow)) {
-        // we continue
-        currentWindow = currentWindow.span(nextWindow);
-      } else {
-        // retrofit the merged window to all windows up to "currentStart"
-        for (int j = i - 1; j >= currentStart; j--) {
-          WindowedValue<KV<K, InputT>> value = elements.get(j);
-          elements.set(
-              j,
-              WindowedValue.of(
-                  value.getValue(), value.getTimestamp(), currentWindow, 
value.getPane()));
-        }
-        currentStart = i;
-        currentWindow = nextWindow;
-      }
-    }
-    if (currentStart < elements.size() - 1) {
-      // we have to retrofit the last batch
-      for (int j = elements.size() - 1; j >= currentStart; j--) {
-        WindowedValue<KV<K, InputT>> value = elements.get(j);
-        elements.set(
-            j,
-            WindowedValue.of(
-                value.getValue(), value.getTimestamp(), currentWindow, 
value.getPane()));
-      }
-    }
   }
 
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/d4a9f60d/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingPartialReduceFunction.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingPartialReduceFunction.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingPartialReduceFunction.java
deleted file mode 100644
index c68f155..0000000
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingPartialReduceFunction.java
+++ /dev/null
@@ -1,201 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.functions;
-
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import org.apache.beam.runners.core.PerKeyCombineFnRunner;
-import org.apache.beam.runners.core.PerKeyCombineFnRunners;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.CombineFnBase;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.flink.util.Collector;
-import org.joda.time.Instant;
-
-/**
- * Special version of {@link FlinkPartialReduceFunction} that supports merging 
windows. This
- * assumes that the windows are {@link IntervalWindow IntervalWindows} and 
exhibits the
- * same behaviour as {@code MergeOverlappingIntervalWindows}.
- */
-public class FlinkMergingPartialReduceFunction<K, InputT, AccumT, W extends 
IntervalWindow>
-    extends FlinkPartialReduceFunction<K, InputT, AccumT, W> {
-
-  public FlinkMergingPartialReduceFunction(
-      CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, ?> combineFn,
-      WindowingStrategy<?, W> windowingStrategy,
-      Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs,
-      PipelineOptions pipelineOptions) {
-    super(combineFn, windowingStrategy, sideInputs, pipelineOptions);
-  }
-
-  @Override
-  public void combine(
-      Iterable<WindowedValue<KV<K, InputT>>> elements,
-      Collector<WindowedValue<KV<K, AccumT>>> out) throws Exception {
-
-    PipelineOptions options = serializedOptions.getPipelineOptions();
-
-    FlinkSideInputReader sideInputReader =
-        new FlinkSideInputReader(sideInputs, getRuntimeContext());
-
-    PerKeyCombineFnRunner<K, InputT, AccumT, ?> combineFnRunner =
-        PerKeyCombineFnRunners.create(combineFn);
-
-    @SuppressWarnings("unchecked")
-    OutputTimeFn<? super BoundedWindow> outputTimeFn =
-        (OutputTimeFn<? super BoundedWindow>) 
windowingStrategy.getOutputTimeFn();
-
-    // get all elements so that we can sort them, has to fit into
-    // memory
-    // this seems very unprudent, but correct, for now
-    List<WindowedValue<KV<K, InputT>>> sortedInput = Lists.newArrayList();
-    for (WindowedValue<KV<K, InputT>> inputValue : elements) {
-      for (WindowedValue<KV<K, InputT>> exploded : 
inputValue.explodeWindows()) {
-        sortedInput.add(exploded);
-      }
-    }
-    Collections.sort(sortedInput, new Comparator<WindowedValue<KV<K, 
InputT>>>() {
-      @Override
-      public int compare(
-          WindowedValue<KV<K, InputT>> o1,
-          WindowedValue<KV<K, InputT>> o2) {
-        return Iterables.getOnlyElement(o1.getWindows()).maxTimestamp()
-            
.compareTo(Iterables.getOnlyElement(o2.getWindows()).maxTimestamp());
-      }
-    });
-
-    // merge windows, we have to do it in an extra pre-processing step and
-    // can't do it as we go since the window of early elements would not
-    // be correct when calling the CombineFn
-    mergeWindow(sortedInput);
-
-    // iterate over the elements that are sorted by window timestamp
-    final Iterator<WindowedValue<KV<K, InputT>>> iterator = 
sortedInput.iterator();
-
-    // create accumulator using the first elements key
-    WindowedValue<KV<K, InputT>> currentValue = iterator.next();
-    K key = currentValue.getValue().getKey();
-    IntervalWindow currentWindow =
-        (IntervalWindow) Iterables.getOnlyElement(currentValue.getWindows());
-    InputT firstValue = currentValue.getValue().getValue();
-    AccumT accumulator = combineFnRunner.createAccumulator(key,
-        options, sideInputReader, currentValue.getWindows());
-    accumulator = combineFnRunner.addInput(key, accumulator, firstValue,
-        options, sideInputReader, currentValue.getWindows());
-
-    // we use this to keep track of the timestamps assigned by the OutputTimeFn
-    Instant windowTimestamp =
-        outputTimeFn.assignOutputTime(currentValue.getTimestamp(), 
currentWindow);
-
-    while (iterator.hasNext()) {
-      WindowedValue<KV<K, InputT>> nextValue = iterator.next();
-      IntervalWindow nextWindow = (IntervalWindow) 
Iterables.getOnlyElement(nextValue.getWindows());
-
-      if (currentWindow.equals(nextWindow)) {
-        // continue accumulating and merge windows
-
-        InputT value = nextValue.getValue().getValue();
-        accumulator = combineFnRunner.addInput(key, accumulator, value,
-            options, sideInputReader, currentValue.getWindows());
-
-        windowTimestamp = outputTimeFn.combine(
-            windowTimestamp,
-            outputTimeFn.assignOutputTime(nextValue.getTimestamp(), 
currentWindow));
-
-      } else {
-        // emit the value that we currently have
-        out.collect(
-            WindowedValue.of(
-                KV.of(key, accumulator),
-                windowTimestamp,
-                currentWindow,
-                PaneInfo.NO_FIRING));
-
-        currentWindow = nextWindow;
-        currentValue = nextValue;
-        InputT value = nextValue.getValue().getValue();
-        accumulator = combineFnRunner.createAccumulator(key,
-            options, sideInputReader, currentValue.getWindows());
-        accumulator = combineFnRunner.addInput(key, accumulator, value,
-            options, sideInputReader, currentValue.getWindows());
-        windowTimestamp = 
outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow);
-      }
-    }
-
-    // emit the final accumulator
-    out.collect(
-        WindowedValue.of(
-            KV.of(key, accumulator),
-            windowTimestamp,
-            currentWindow,
-            PaneInfo.NO_FIRING));
-  }
-
-  /**
-   * Merge windows. This assumes that the list of elements is sorted by 
window-end timestamp.
-   * This replaces windows in the input list.
-   */
-  private void mergeWindow(List<WindowedValue<KV<K, InputT>>> elements) {
-    int currentStart = 0;
-    IntervalWindow currentWindow =
-        (IntervalWindow) 
Iterables.getOnlyElement(elements.get(0).getWindows());
-
-    for (int i = 1; i < elements.size(); i++) {
-      WindowedValue<KV<K, InputT>> nextValue = elements.get(i);
-      IntervalWindow nextWindow =
-          (IntervalWindow) Iterables.getOnlyElement(nextValue.getWindows());
-      if (currentWindow.intersects(nextWindow)) {
-        // we continue
-        currentWindow = currentWindow.span(nextWindow);
-      } else {
-        // retrofit the merged window to all windows up to "currentStart"
-        for (int j = i - 1; j >= currentStart; j--) {
-          WindowedValue<KV<K, InputT>> value = elements.get(j);
-          elements.set(
-              j,
-              WindowedValue.of(
-                  value.getValue(), value.getTimestamp(), currentWindow, 
value.getPane()));
-        }
-        currentStart = i;
-        currentWindow = nextWindow;
-      }
-    }
-    if (currentStart < elements.size() - 1) {
-      // we have to retrofit the last batch
-      for (int j = elements.size() - 1; j >= currentStart; j--) {
-        WindowedValue<KV<K, InputT>> value = elements.get(j);
-        elements.set(
-            j,
-            WindowedValue.of(
-                value.getValue(), value.getTimestamp(), currentWindow, 
value.getPane()));
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/d4a9f60d/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingReduceFunction.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingReduceFunction.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingReduceFunction.java
deleted file mode 100644
index 84b3adc..0000000
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingReduceFunction.java
+++ /dev/null
@@ -1,199 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.functions;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import org.apache.beam.runners.core.PerKeyCombineFnRunner;
-import org.apache.beam.runners.core.PerKeyCombineFnRunners;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.CombineFnBase;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.flink.util.Collector;
-import org.joda.time.Instant;
-
-/**
- * Special version of {@link FlinkReduceFunction} that supports merging 
windows. This
- * assumes that the windows are {@link IntervalWindow IntervalWindows} and 
exhibits the
- * same behaviour as {@code MergeOverlappingIntervalWindows}.
- */
-public class FlinkMergingReduceFunction<K, AccumT, OutputT, W extends 
IntervalWindow>
-    extends FlinkReduceFunction<K, AccumT, OutputT, W> {
-
-  public FlinkMergingReduceFunction(
-      CombineFnBase.PerKeyCombineFn<K, ?, AccumT, OutputT> keyedCombineFn,
-      WindowingStrategy<?, W> windowingStrategy,
-      Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs,
-      PipelineOptions pipelineOptions) {
-    super(keyedCombineFn, windowingStrategy, sideInputs, pipelineOptions);
-  }
-
-  @Override
-  public void reduce(
-      Iterable<WindowedValue<KV<K, AccumT>>> elements,
-      Collector<WindowedValue<KV<K, OutputT>>> out) throws Exception {
-
-    PipelineOptions options = serializedOptions.getPipelineOptions();
-
-    FlinkSideInputReader sideInputReader =
-        new FlinkSideInputReader(sideInputs, getRuntimeContext());
-
-    PerKeyCombineFnRunner<K, ?, AccumT, OutputT> combineFnRunner =
-        PerKeyCombineFnRunners.create(combineFn);
-
-    @SuppressWarnings("unchecked")
-    OutputTimeFn<? super BoundedWindow> outputTimeFn =
-        (OutputTimeFn<? super BoundedWindow>) 
windowingStrategy.getOutputTimeFn();
-
-    // get all elements so that we can sort them, has to fit into
-    // memory
-    // this seems very unprudent, but correct, for now
-    ArrayList<WindowedValue<KV<K, AccumT>>> sortedInput = Lists.newArrayList();
-    for (WindowedValue<KV<K, AccumT>> inputValue : elements) {
-      for (WindowedValue<KV<K, AccumT>> exploded : 
inputValue.explodeWindows()) {
-        sortedInput.add(exploded);
-      }
-    }
-    Collections.sort(sortedInput, new Comparator<WindowedValue<KV<K, 
AccumT>>>() {
-      @Override
-      public int compare(
-          WindowedValue<KV<K, AccumT>> o1,
-          WindowedValue<KV<K, AccumT>> o2) {
-        return Iterables.getOnlyElement(o1.getWindows()).maxTimestamp()
-            
.compareTo(Iterables.getOnlyElement(o2.getWindows()).maxTimestamp());
-      }
-    });
-
-    // merge windows, we have to do it in an extra pre-processing step and
-    // can't do it as we go since the window of early elements would not
-    // be correct when calling the CombineFn
-    mergeWindow(sortedInput);
-
-    // iterate over the elements that are sorted by window timestamp
-    final Iterator<WindowedValue<KV<K, AccumT>>> iterator = 
sortedInput.iterator();
-
-    // get the first accumulator
-    WindowedValue<KV<K, AccumT>> currentValue = iterator.next();
-    K key = currentValue.getValue().getKey();
-    IntervalWindow currentWindow =
-        (IntervalWindow) Iterables.getOnlyElement(currentValue.getWindows());
-    AccumT accumulator = currentValue.getValue().getValue();
-
-    // we use this to keep track of the timestamps assigned by the 
OutputTimeFn,
-    // in FlinkPartialReduceFunction we already merge the timestamps assigned
-    // to individual elements, here we just merge them
-    List<Instant> windowTimestamps = new ArrayList<>();
-    windowTimestamps.add(currentValue.getTimestamp());
-
-    while (iterator.hasNext()) {
-      WindowedValue<KV<K, AccumT>> nextValue = iterator.next();
-      IntervalWindow nextWindow =
-          (IntervalWindow) Iterables.getOnlyElement(nextValue.getWindows());
-
-      if (nextWindow.equals(currentWindow)) {
-        // continue accumulating and merge windows
-
-        accumulator = combineFnRunner.mergeAccumulators(
-            key, ImmutableList.of(accumulator, 
nextValue.getValue().getValue()),
-            options, sideInputReader, currentValue.getWindows());
-
-        windowTimestamps.add(nextValue.getTimestamp());
-      } else {
-        out.collect(
-            WindowedValue.of(
-                KV.of(key, combineFnRunner.extractOutput(key, accumulator,
-                    options, sideInputReader, currentValue.getWindows())),
-                outputTimeFn.merge(currentWindow, windowTimestamps),
-                currentWindow,
-                PaneInfo.NO_FIRING));
-
-        windowTimestamps.clear();
-
-        currentWindow = nextWindow;
-        currentValue = nextValue;
-        accumulator = nextValue.getValue().getValue();
-        windowTimestamps.add(nextValue.getTimestamp());
-      }
-    }
-
-    // emit the final accumulator
-    out.collect(
-        WindowedValue.of(
-            KV.of(key, combineFnRunner.extractOutput(key, accumulator,
-                options, sideInputReader, currentValue.getWindows())),
-            outputTimeFn.merge(currentWindow, windowTimestamps),
-            currentWindow,
-            PaneInfo.NO_FIRING));
-  }
-
-  /**
-   * Merge windows. This assumes that the list of elements is sorted by 
window-end timestamp.
-   * This replaces windows in the input list.
-   */
-  private void mergeWindow(List<WindowedValue<KV<K, AccumT>>> elements) {
-    int currentStart = 0;
-    IntervalWindow currentWindow =
-        (IntervalWindow) 
Iterables.getOnlyElement(elements.get(0).getWindows());
-
-    for (int i = 1; i < elements.size(); i++) {
-      WindowedValue<KV<K, AccumT>> nextValue = elements.get(i);
-      IntervalWindow nextWindow =
-          (IntervalWindow) Iterables.getOnlyElement(nextValue.getWindows());
-      if (currentWindow.intersects(nextWindow)) {
-        // we continue
-        currentWindow = currentWindow.span(nextWindow);
-      } else {
-        // retrofit the merged window to all windows up to "currentStart"
-        for (int j = i - 1; j >= currentStart; j--) {
-          WindowedValue<KV<K, AccumT>> value = elements.get(j);
-          elements.set(
-              j,
-              WindowedValue.of(
-                  value.getValue(), value.getTimestamp(), currentWindow, 
value.getPane()));
-        }
-        currentStart = i;
-        currentWindow = nextWindow;
-      }
-    }
-    if (currentStart < elements.size() - 1) {
-      // we have to retrofit the last batch
-      for (int j = elements.size() - 1; j >= currentStart; j--) {
-        WindowedValue<KV<K, AccumT>> value = elements.get(j);
-        elements.set(
-            j,
-            WindowedValue.of(
-                value.getValue(), value.getTimestamp(), currentWindow, 
value.getPane()));
-      }
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/d4a9f60d/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
index 1d1ff9f..9a44840 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
@@ -17,28 +17,18 @@
  */
 package org.apache.beam.runners.flink.translation.functions;
 
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Iterator;
 import java.util.Map;
-import org.apache.beam.runners.core.PerKeyCombineFnRunner;
-import org.apache.beam.runners.core.PerKeyCombineFnRunners;
 import 
org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.CombineFnBase;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.flink.api.common.functions.RichGroupCombineFunction;
 import org.apache.flink.util.Collector;
-import org.joda.time.Instant;
 
 /**
  * This is is the first step for executing a {@link 
org.apache.beam.sdk.transforms.Combine.PerKey}
@@ -54,7 +44,7 @@ public class FlinkPartialReduceFunction<K, InputT, AccumT, W 
extends BoundedWind
 
   protected final CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, ?> 
combineFn;
 
-  protected final WindowingStrategy<?, W> windowingStrategy;
+  protected final WindowingStrategy<Object, W> windowingStrategy;
 
   protected final SerializedPipelineOptions serializedOptions;
 
@@ -62,7 +52,7 @@ public class FlinkPartialReduceFunction<K, InputT, AccumT, W 
extends BoundedWind
 
   public FlinkPartialReduceFunction(
       CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, ?> combineFn,
-      WindowingStrategy<?, W> windowingStrategy,
+      WindowingStrategy<Object, W> windowingStrategy,
       Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs,
       PipelineOptions pipelineOptions) {
 
@@ -83,90 +73,22 @@ public class FlinkPartialReduceFunction<K, InputT, AccumT, 
W extends BoundedWind
     FlinkSideInputReader sideInputReader =
         new FlinkSideInputReader(sideInputs, getRuntimeContext());
 
-    PerKeyCombineFnRunner<K, InputT, AccumT, ?> combineFnRunner =
-        PerKeyCombineFnRunners.create(combineFn);
-
-    @SuppressWarnings("unchecked")
-    OutputTimeFn<? super BoundedWindow> outputTimeFn =
-        (OutputTimeFn<? super BoundedWindow>) 
windowingStrategy.getOutputTimeFn();
-
-    // get all elements so that we can sort them, has to fit into
-    // memory
-    // this seems very unprudent, but correct, for now
-    ArrayList<WindowedValue<KV<K, InputT>>> sortedInput = Lists.newArrayList();
-    for (WindowedValue<KV<K, InputT>> inputValue : elements) {
-      for (WindowedValue<KV<K, InputT>> exploded : 
inputValue.explodeWindows()) {
-        sortedInput.add(exploded);
-      }
-    }
-    Collections.sort(sortedInput, new Comparator<WindowedValue<KV<K, 
InputT>>>() {
-      @Override
-      public int compare(
-          WindowedValue<KV<K, InputT>> o1,
-          WindowedValue<KV<K, InputT>> o2) {
-        return Iterables.getOnlyElement(o1.getWindows()).maxTimestamp()
-            
.compareTo(Iterables.getOnlyElement(o2.getWindows()).maxTimestamp());
-      }
-    });
-
-    // iterate over the elements that are sorted by window timestamp
-    //
-    final Iterator<WindowedValue<KV<K, InputT>>> iterator = 
sortedInput.iterator();
-
-    // create accumulator using the first elements key
-    WindowedValue<KV<K, InputT>> currentValue = iterator.next();
-    K key = currentValue.getValue().getKey();
-    BoundedWindow currentWindow = 
Iterables.getFirst(currentValue.getWindows(), null);
-    InputT firstValue = currentValue.getValue().getValue();
-    AccumT accumulator = combineFnRunner.createAccumulator(key,
-        options, sideInputReader, currentValue.getWindows());
-    accumulator = combineFnRunner.addInput(key, accumulator, firstValue,
-        options, sideInputReader, currentValue.getWindows());
-
-    // we use this to keep track of the timestamps assigned by the OutputTimeFn
-    Instant windowTimestamp =
-        outputTimeFn.assignOutputTime(currentValue.getTimestamp(), 
currentWindow);
-
-    while (iterator.hasNext()) {
-      WindowedValue<KV<K, InputT>> nextValue = iterator.next();
-      BoundedWindow nextWindow = 
Iterables.getOnlyElement(nextValue.getWindows());
-
-      if (nextWindow.equals(currentWindow)) {
-        // continue accumulating
-        InputT value = nextValue.getValue().getValue();
-        accumulator = combineFnRunner.addInput(key, accumulator, value,
-            options, sideInputReader, currentValue.getWindows());
-
-        windowTimestamp = outputTimeFn.combine(
-            windowTimestamp,
-            outputTimeFn.assignOutputTime(nextValue.getTimestamp(), 
currentWindow));
-
-      } else {
-        // emit the value that we currently have
-        out.collect(
-            WindowedValue.of(
-                KV.of(key, accumulator),
-                windowTimestamp,
-                currentWindow,
-                PaneInfo.NO_FIRING));
-
-        currentWindow = nextWindow;
-        currentValue = nextValue;
-        InputT value = nextValue.getValue().getValue();
-        accumulator = combineFnRunner.createAccumulator(key,
-            options, sideInputReader, currentValue.getWindows());
-        accumulator = combineFnRunner.addInput(key, accumulator, value,
-            options, sideInputReader, currentValue.getWindows());
-        windowTimestamp = 
outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow);
-      }
+    AbstractFlinkCombineRunner<K, InputT, AccumT, AccumT, W> reduceRunner;
+
+    if (!windowingStrategy.getWindowFn().isNonMerging()
+        && 
!windowingStrategy.getWindowFn().windowCoder().equals(IntervalWindow.getCoder()))
 {
+      reduceRunner = new HashingFlinkCombineRunner<>();
+    } else {
+      reduceRunner = new SortingFlinkCombineRunner<>();
     }
 
-    // emit the final accumulator
-    out.collect(
-        WindowedValue.of(
-            KV.of(key, accumulator),
-            windowTimestamp,
-            currentWindow,
-            PaneInfo.NO_FIRING));
+    reduceRunner.combine(
+        new AbstractFlinkCombineRunner.PartialFlinkCombiner<>(combineFn),
+        windowingStrategy,
+        sideInputReader,
+        options,
+        elements,
+        out);
+
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/d4a9f60d/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
index 3e4f742..6c1a2e4 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
@@ -17,30 +17,18 @@
  */
 package org.apache.beam.runners.flink.translation.functions;
 
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.List;
 import java.util.Map;
-import org.apache.beam.runners.core.PerKeyCombineFnRunner;
-import org.apache.beam.runners.core.PerKeyCombineFnRunners;
 import 
org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.CombineFnBase;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.flink.api.common.functions.RichGroupReduceFunction;
 import org.apache.flink.util.Collector;
-import org.joda.time.Instant;
 
 /**
  * This is the second part for executing a {@link 
org.apache.beam.sdk.transforms.Combine.PerKey}
@@ -56,7 +44,7 @@ public class FlinkReduceFunction<K, AccumT, OutputT, W 
extends BoundedWindow>
 
   protected final CombineFnBase.PerKeyCombineFn<K, ?, AccumT, OutputT> 
combineFn;
 
-  protected final WindowingStrategy<?, W> windowingStrategy;
+  protected final WindowingStrategy<Object, W> windowingStrategy;
 
   protected final Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs;
 
@@ -64,7 +52,7 @@ public class FlinkReduceFunction<K, AccumT, OutputT, W 
extends BoundedWindow>
 
   public FlinkReduceFunction(
       CombineFnBase.PerKeyCombineFn<K, ?, AccumT, OutputT> keyedCombineFn,
-      WindowingStrategy<?, W> windowingStrategy,
+      WindowingStrategy<Object, W> windowingStrategy,
       Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs,
       PipelineOptions pipelineOptions) {
 
@@ -87,87 +75,23 @@ public class FlinkReduceFunction<K, AccumT, OutputT, W 
extends BoundedWindow>
     FlinkSideInputReader sideInputReader =
         new FlinkSideInputReader(sideInputs, getRuntimeContext());
 
-    PerKeyCombineFnRunner<K, ?, AccumT, OutputT> combineFnRunner =
-        PerKeyCombineFnRunners.create(combineFn);
+    AbstractFlinkCombineRunner<K, AccumT, AccumT, OutputT, W> reduceRunner;
 
-    @SuppressWarnings("unchecked")
-    OutputTimeFn<? super BoundedWindow> outputTimeFn =
-        (OutputTimeFn<? super BoundedWindow>) 
windowingStrategy.getOutputTimeFn();
-
-
-    // get all elements so that we can sort them, has to fit into
-    // memory
-    // this seems very unprudent, but correct, for now
-    ArrayList<WindowedValue<KV<K, AccumT>>> sortedInput = Lists.newArrayList();
-    for (WindowedValue<KV<K, AccumT>> inputValue: elements) {
-      for (WindowedValue<KV<K, AccumT>> exploded: inputValue.explodeWindows()) 
{
-        sortedInput.add(exploded);
-      }
+    if (!windowingStrategy.getWindowFn().isNonMerging()
+        && 
!windowingStrategy.getWindowFn().windowCoder().equals(IntervalWindow.getCoder()))
 {
+      reduceRunner = new HashingFlinkCombineRunner<>();
+    } else {
+      reduceRunner = new SortingFlinkCombineRunner<>();
     }
-    Collections.sort(sortedInput, new Comparator<WindowedValue<KV<K, 
AccumT>>>() {
-      @Override
-      public int compare(
-          WindowedValue<KV<K, AccumT>> o1,
-          WindowedValue<KV<K, AccumT>> o2) {
-        return Iterables.getOnlyElement(o1.getWindows()).maxTimestamp()
-            
.compareTo(Iterables.getOnlyElement(o2.getWindows()).maxTimestamp());
-      }
-    });
-
-    // iterate over the elements that are sorted by window timestamp
-    //
-    final Iterator<WindowedValue<KV<K, AccumT>>> iterator = 
sortedInput.iterator();
-
-    // get the first accumulator
-    WindowedValue<KV<K, AccumT>> currentValue = iterator.next();
-    K key = currentValue.getValue().getKey();
-    BoundedWindow currentWindow = 
Iterables.getFirst(currentValue.getWindows(), null);
-    AccumT accumulator = currentValue.getValue().getValue();
-
-    // we use this to keep track of the timestamps assigned by the 
OutputTimeFn,
-    // in FlinkPartialReduceFunction we already merge the timestamps assigned
-    // to individual elements, here we just merge them
-    List<Instant> windowTimestamps = new ArrayList<>();
-    windowTimestamps.add(currentValue.getTimestamp());
-
-    while (iterator.hasNext()) {
-      WindowedValue<KV<K, AccumT>> nextValue = iterator.next();
-      BoundedWindow nextWindow = 
Iterables.getOnlyElement(nextValue.getWindows());
-
-      if (nextWindow.equals(currentWindow)) {
-        // continue accumulating
-        accumulator = combineFnRunner.mergeAccumulators(
-            key, ImmutableList.of(accumulator, 
nextValue.getValue().getValue()),
-            options, sideInputReader, currentValue.getWindows());
-
-        windowTimestamps.add(nextValue.getTimestamp());
-      } else {
-        // emit the value that we currently have
-        out.collect(
-            WindowedValue.of(
-                KV.of(key, combineFnRunner.extractOutput(key, accumulator,
-                    options, sideInputReader, currentValue.getWindows())),
-                outputTimeFn.merge(currentWindow, windowTimestamps),
-                currentWindow,
-                PaneInfo.NO_FIRING));
-
-        windowTimestamps.clear();
-
-        currentWindow = nextWindow;
-        currentValue = nextValue;
-        accumulator = nextValue.getValue().getValue();
-        windowTimestamps.add(nextValue.getTimestamp());
-      }
 
-    }
+    reduceRunner.combine(
+        new AbstractFlinkCombineRunner.FinalFlinkCombiner<>(combineFn),
+        windowingStrategy,
+        sideInputReader,
+        options,
+        elements,
+        out);
 
-    // emit the final accumulator
-    out.collect(
-        WindowedValue.of(
-            KV.of(key, combineFnRunner.extractOutput(key, accumulator,
-                options, sideInputReader, currentValue.getWindows())),
-            outputTimeFn.merge(currentWindow, windowTimestamps),
-            currentWindow,
-            PaneInfo.NO_FIRING));
   }
+
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/d4a9f60d/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/HashingFlinkCombineRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/HashingFlinkCombineRunner.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/HashingFlinkCombineRunner.java
new file mode 100644
index 0000000..b904bfe
--- /dev/null
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/HashingFlinkCombineRunner.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.functions;
+
+import com.google.common.collect.Iterables;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.SideInputReader;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.KV;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.Collector;
+import org.joda.time.Instant;
+
+/**
+ * A Flink combine runner that builds a map of merged windows and produces 
output after
+ * seeing all input. This is similar to what{@link 
org.apache.beam.runners.core.ReduceFnRunner}
+ * does.
+ */
+public class HashingFlinkCombineRunner<
+    K, InputT, AccumT, OutputT, W extends BoundedWindow>
+    extends AbstractFlinkCombineRunner<K, InputT, AccumT, OutputT, W> {
+
+  @Override
+  public void combine(
+      FlinkCombiner<K, InputT, AccumT, OutputT> flinkCombiner,
+      WindowingStrategy<Object, W> windowingStrategy,
+      SideInputReader sideInputReader,
+      PipelineOptions options,
+      Iterable<WindowedValue<KV<K, InputT>>> elements,
+      Collector<WindowedValue<KV<K, OutputT>>> out) throws Exception {
+
+
+    @SuppressWarnings("unchecked")
+    OutputTimeFn<? super BoundedWindow> outputTimeFn =
+        (OutputTimeFn<? super BoundedWindow>) 
windowingStrategy.getOutputTimeFn();
+
+    // Flink Iterable can be iterated over only once.
+    List<WindowedValue<KV<K, InputT>>> inputs = new ArrayList<>();
+    Iterables.addAll(inputs, elements);
+
+    Set<W> windows = collectWindows(inputs);
+    Map<W, W> windowToMergeResult = mergeWindows(windowingStrategy, windows);
+
+    // Combine all windowedValues into map
+    Map<W, Tuple2<AccumT, Instant>> mapState = new HashMap<>();
+    Iterator<WindowedValue<KV<K, InputT>>> iterator = inputs.iterator();
+    WindowedValue<KV<K, InputT>> currentValue = iterator.next();
+    K key = currentValue.getValue().getKey();
+    do {
+      for (BoundedWindow w : currentValue.getWindows()) {
+        @SuppressWarnings("unchecked")
+        W currentWindow = (W) w;
+        W mergedWindow = windowToMergeResult.get(currentWindow);
+        mergedWindow = mergedWindow == null ? currentWindow : mergedWindow;
+        Set<W> singletonW = Collections.singleton(mergedWindow);
+        Tuple2<AccumT, Instant> accumAndInstant = mapState.get(mergedWindow);
+        if (accumAndInstant == null) {
+          AccumT accumT = flinkCombiner.firstInput(key, 
currentValue.getValue().getValue(),
+              options, sideInputReader, singletonW);
+          Instant windowTimestamp =
+              outputTimeFn.assignOutputTime(currentValue.getTimestamp(), 
mergedWindow);
+          accumAndInstant = new Tuple2<>(accumT, windowTimestamp);
+          mapState.put(mergedWindow, accumAndInstant);
+        } else {
+          accumAndInstant.f0 = flinkCombiner.addInput(key, accumAndInstant.f0,
+              currentValue.getValue().getValue(), options, sideInputReader, 
singletonW);
+          accumAndInstant.f1 = outputTimeFn.combine(accumAndInstant.f1,
+              outputTimeFn.assignOutputTime(currentValue.getTimestamp(), 
mergedWindow));
+        }
+      }
+      if (iterator.hasNext()) {
+        currentValue = iterator.next();
+      } else {
+        break;
+      }
+    } while (true);
+
+    // Output the final value of combiners
+    for (Map.Entry<W, Tuple2<AccumT, Instant>> entry : mapState.entrySet()) {
+      AccumT accumulator = entry.getValue().f0;
+      Instant windowTimestamp = entry.getValue().f1;
+      out.collect(
+          WindowedValue.of(
+              KV.of(key, flinkCombiner.extractOutput(key, accumulator,
+                  options, sideInputReader, 
Collections.singleton(entry.getKey()))),
+              windowTimestamp,
+              entry.getKey(),
+              PaneInfo.NO_FIRING));
+    }
+
+  }
+
+  private Map<W, W> mergeWindows(
+      WindowingStrategy<Object, W> windowingStrategy,
+      Set<W> windows) throws Exception {
+    WindowFn<Object, W> windowFn = windowingStrategy.getWindowFn();
+
+    if (windowingStrategy.getWindowFn().isNonMerging()) {
+      // Return an empty map, indicating that every window is not merged.
+      return Collections.emptyMap();
+    }
+
+    Map<W, W> windowToMergeResult = new HashMap<>();
+    windowFn.mergeWindows(new MergeContextImpl(windowFn, windows, 
windowToMergeResult));
+    return windowToMergeResult;
+  }
+
+  private class MergeContextImpl extends WindowFn<Object, W>.MergeContext {
+
+    private Set<W> windows;
+    private Map<W, W> windowToMergeResult;
+
+    MergeContextImpl(WindowFn<Object, W> windowFn, Set<W> windows, Map<W, W> 
windowToMergeResult) {
+      windowFn.super();
+      this.windows = windows;
+      this.windowToMergeResult = windowToMergeResult;
+    }
+
+    @Override
+    public Collection<W> windows() {
+      return windows;
+    }
+
+    @Override
+    public void merge(Collection<W> toBeMerged, W mergeResult) throws 
Exception {
+      for (W w : toBeMerged) {
+        windowToMergeResult.put(w, mergeResult);
+      }
+    }
+  }
+
+  private Set<W> collectWindows(Iterable<WindowedValue<KV<K, InputT>>> values) 
{
+    Set<W> windows = new HashSet<>();
+    for (WindowedValue<?> value : values) {
+      for (BoundedWindow untypedWindow : value.getWindows()) {
+        @SuppressWarnings("unchecked")
+        W window = (W) untypedWindow;
+        windows.add(window);
+      }
+    }
+    return windows;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/d4a9f60d/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SortingFlinkCombineRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SortingFlinkCombineRunner.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SortingFlinkCombineRunner.java
new file mode 100644
index 0000000..2967f2c
--- /dev/null
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SortingFlinkCombineRunner.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.functions;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.SideInputReader;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.KV;
+import org.apache.flink.util.Collector;
+import org.joda.time.Instant;
+
+/**
+ * A Flink combine runner that first sorts the elements by window and then 
does one pass that
+ * merges windows and outputs results.
+ */
+public class SortingFlinkCombineRunner<K, InputT, AccumT, OutputT, W extends 
BoundedWindow>
+    extends AbstractFlinkCombineRunner<K, InputT, AccumT, OutputT, W> {
+
+
+  @Override
+  public void combine(
+      FlinkCombiner<K, InputT, AccumT, OutputT> flinkCombiner,
+      WindowingStrategy<Object, W> windowingStrategy,
+      SideInputReader sideInputReader,
+      PipelineOptions options,
+      Iterable<WindowedValue<KV<K, InputT>>> elements,
+      Collector<WindowedValue<KV<K, OutputT>>> out) throws Exception {
+
+    @SuppressWarnings("unchecked")
+    OutputTimeFn<? super BoundedWindow> outputTimeFn =
+        (OutputTimeFn<? super BoundedWindow>) 
windowingStrategy.getOutputTimeFn();
+
+    // get all elements so that we can sort them, has to fit into
+    // memory
+    // this seems very unprudent, but correct, for now
+    List<WindowedValue<KV<K, InputT>>> sortedInput = Lists.newArrayList();
+    for (WindowedValue<KV<K, InputT>> inputValue : elements) {
+      for (WindowedValue<KV<K, InputT>> exploded : 
inputValue.explodeWindows()) {
+        sortedInput.add(exploded);
+      }
+    }
+    Collections.sort(sortedInput, new Comparator<WindowedValue<KV<K, 
InputT>>>() {
+      @Override
+      public int compare(
+          WindowedValue<KV<K, InputT>> o1,
+          WindowedValue<KV<K, InputT>> o2) {
+        return Iterables.getOnlyElement(o1.getWindows()).maxTimestamp()
+            
.compareTo(Iterables.getOnlyElement(o2.getWindows()).maxTimestamp());
+      }
+    });
+
+    if (!windowingStrategy.getWindowFn().isNonMerging()) {
+      // merge windows, we have to do it in an extra pre-processing step and
+      // can't do it as we go since the window of early elements would not
+      // be correct when calling the CombineFn
+      mergeWindow(sortedInput);
+    }
+
+    // iterate over the elements that are sorted by window timestamp
+    final Iterator<WindowedValue<KV<K, InputT>>> iterator = 
sortedInput.iterator();
+
+    // create accumulator using the first elements key
+    WindowedValue<KV<K, InputT>> currentValue = iterator.next();
+    K key = currentValue.getValue().getKey();
+    BoundedWindow currentWindow = 
Iterables.getOnlyElement(currentValue.getWindows());
+    InputT firstValue = currentValue.getValue().getValue();
+    AccumT accumulator = flinkCombiner.firstInput(
+        key, firstValue, options, sideInputReader, currentValue.getWindows());
+
+    // we use this to keep track of the timestamps assigned by the OutputTimeFn
+    Instant windowTimestamp =
+        outputTimeFn.assignOutputTime(currentValue.getTimestamp(), 
currentWindow);
+
+    while (iterator.hasNext()) {
+      WindowedValue<KV<K, InputT>> nextValue = iterator.next();
+      BoundedWindow nextWindow = 
Iterables.getOnlyElement(nextValue.getWindows());
+
+      if (currentWindow.equals(nextWindow)) {
+        // continue accumulating and merge windows
+
+        InputT value = nextValue.getValue().getValue();
+        accumulator = flinkCombiner.addInput(key, accumulator, value,
+            options, sideInputReader, currentValue.getWindows());
+
+        windowTimestamp = outputTimeFn.combine(
+            windowTimestamp,
+            outputTimeFn.assignOutputTime(nextValue.getTimestamp(), 
currentWindow));
+
+      } else {
+        // emit the value that we currently have
+        out.collect(
+            WindowedValue.of(
+                KV.of(key, flinkCombiner.extractOutput(key, accumulator,
+                    options, sideInputReader, currentValue.getWindows())),
+                windowTimestamp,
+                currentWindow,
+                PaneInfo.NO_FIRING));
+
+        currentWindow = nextWindow;
+        currentValue = nextValue;
+        InputT value = nextValue.getValue().getValue();
+        accumulator = flinkCombiner.firstInput(key, value,
+            options, sideInputReader, currentValue.getWindows());
+        windowTimestamp = 
outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow);
+      }
+
+    }
+
+    // emit the final accumulator
+    out.collect(
+        WindowedValue.of(
+            KV.of(key, flinkCombiner.extractOutput(key, accumulator,
+                options, sideInputReader, currentValue.getWindows())),
+            windowTimestamp,
+            currentWindow,
+            PaneInfo.NO_FIRING));
+  }
+
+  /**
+   * Merge windows. This assumes that the list of elements is sorted by 
window-end timestamp.
+   * This replaces windows in the input list.
+   */
+  private void mergeWindow(List<WindowedValue<KV<K, InputT>>> elements) {
+    int currentStart = 0;
+    IntervalWindow currentWindow =
+        (IntervalWindow) 
Iterables.getOnlyElement(elements.get(0).getWindows());
+
+    for (int i = 1; i < elements.size(); i++) {
+      WindowedValue<KV<K, InputT>> nextValue = elements.get(i);
+      IntervalWindow nextWindow =
+          (IntervalWindow) Iterables.getOnlyElement(nextValue.getWindows());
+      if (currentWindow.intersects(nextWindow)) {
+        // we continue
+        currentWindow = currentWindow.span(nextWindow);
+      } else {
+        // retrofit the merged window to all windows up to "currentStart"
+        for (int j = i - 1; j >= currentStart; j--) {
+          WindowedValue<KV<K, InputT>> value = elements.get(j);
+          elements.set(
+              j,
+              WindowedValue.of(
+                  value.getValue(), value.getTimestamp(), currentWindow, 
value.getPane()));
+        }
+        currentStart = i;
+        currentWindow = nextWindow;
+      }
+    }
+    if (currentStart < elements.size() - 1) {
+      // we have to retrofit the last batch
+      for (int j = elements.size() - 1; j >= currentStart; j--) {
+        WindowedValue<KV<K, InputT>> value = elements.get(j);
+        elements.set(
+            j,
+            WindowedValue.of(
+                value.getValue(), value.getTimestamp(), currentWindow, 
value.getPane()));
+      }
+    }
+  }
+}

Reply via email to