Repository: incubator-beam
Updated Branches:
  refs/heads/master 6c34f3a34 -> c26eef5be


Move GroupByKey expansion into DirectPipelineRunner


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ca5b2def
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ca5b2def
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ca5b2def

Branch: refs/heads/master
Commit: ca5b2def2bf77bd841fc670b167fe7ba37801603
Parents: 706fc53
Author: Kenneth Knowles <k...@google.com>
Authored: Thu Mar 24 15:26:37 2016 -0700
Committer: Kenneth Knowles <k...@google.com>
Committed: Tue Apr 5 13:54:03 2016 -0700

----------------------------------------------------------------------
 .../FlinkBatchTransformTranslators.java         |  26 +-
 .../beam/runners/spark/SparkPipelineRunner.java |  20 ++
 .../spark/translation/TransformTranslator.java  |  22 +-
 .../sdk/runners/DirectPipelineRunner.java       | 116 +++++++
 .../inprocess/GroupByKeyEvaluatorFactory.java   |   2 +-
 .../dataflow/sdk/transforms/GroupByKey.java     | 310 +------------------
 .../sdk/util/GroupByKeyViaGroupByKeyOnly.java   | 245 +++++++++++++++
 .../cloud/dataflow/sdk/util/ReduceFnRunner.java |   1 -
 .../GroupByKeyEvaluatorFactoryTest.java         |   4 +-
 9 files changed, 404 insertions(+), 342 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ca5b2def/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
index 48c783d..b3c0cea 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
@@ -96,7 +96,7 @@ public class FlinkBatchTransformTranslators {
   // 
--------------------------------------------------------------------------------------------
   //  Transform Translator Registry
   // 
--------------------------------------------------------------------------------------------
-  
+
   @SuppressWarnings("rawtypes")
   private static final Map<Class<? extends PTransform>, 
FlinkBatchPipelineTranslator.BatchTransformTranslator> TRANSLATORS = new 
HashMap<>();
 
@@ -112,7 +112,6 @@ public class FlinkBatchTransformTranslators {
 
     TRANSLATORS.put(Flatten.FlattenPCollectionList.class, new 
FlattenPCollectionTranslatorBatch());
 
-    TRANSLATORS.put(GroupByKey.GroupByKeyOnly.class, new 
GroupByKeyOnlyTranslatorBatch());
     // TODO we're currently ignoring windows here but that has to change in 
the future
     TRANSLATORS.put(GroupByKey.class, new GroupByKeyTranslatorBatch());
 
@@ -302,25 +301,8 @@ public class FlinkBatchTransformTranslators {
     }
   }
 
-  private static class GroupByKeyOnlyTranslatorBatch<K, V> implements 
FlinkBatchPipelineTranslator.BatchTransformTranslator<GroupByKey.GroupByKeyOnly<K,
 V>> {
-
-    @Override
-    public void translateNode(GroupByKey.GroupByKeyOnly<K, V> transform, 
FlinkBatchTranslationContext context) {
-      DataSet<KV<K, V>> inputDataSet = 
context.getInputDataSet(context.getInput(transform));
-      GroupReduceFunction<KV<K, V>, KV<K, Iterable<V>>> groupReduceFunction = 
new FlinkKeyedListAggregationFunction<>();
-
-      TypeInformation<KV<K, Iterable<V>>> typeInformation = 
context.getTypeInfo(context.getOutput(transform));
-
-      Grouping<KV<K, V>> grouping = new UnsortedGrouping<>(inputDataSet, new 
Keys.ExpressionKeys<>(new String[]{"key"}, inputDataSet.getType()));
-
-      GroupReduceOperator<KV<K, V>, KV<K, Iterable<V>>> outputDataSet =
-          new GroupReduceOperator<>(grouping, typeInformation, 
groupReduceFunction, transform.getName());
-      context.setOutputDataSet(context.getOutput(transform), outputDataSet);
-    }
-  }
-
   /**
-   * Translates a GroupByKey while ignoring window assignments. This is 
identical to the {@link GroupByKeyOnlyTranslatorBatch}
+   * Translates a GroupByKey while ignoring window assignments. Current 
ignores windows.
    */
   private static class GroupByKeyTranslatorBatch<K, V> implements 
FlinkBatchPipelineTranslator.BatchTransformTranslator<GroupByKey<K, V>> {
 
@@ -406,7 +388,7 @@ public class FlinkBatchTransformTranslators {
 //      context.setOutputDataSet(transform.getOutput(), outputDataSet);
 //    }
 //  }
-  
+
   private static class ParDoBoundTranslatorBatch<IN, OUT> implements 
FlinkBatchPipelineTranslator.BatchTransformTranslator<ParDo.Bound<IN, OUT>> {
     private static final Logger LOG = 
LoggerFactory.getLogger(ParDoBoundTranslatorBatch.class);
 
@@ -589,6 +571,6 @@ public class FlinkBatchTransformTranslators {
   // 
--------------------------------------------------------------------------------------------
   //  Miscellaneous
   // 
--------------------------------------------------------------------------------------------
-  
+
   private FlinkBatchTransformTranslators() {}
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ca5b2def/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java
index d5e4186..71e358c 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java
@@ -23,7 +23,10 @@ import com.google.cloud.dataflow.sdk.options.PipelineOptions;
 import com.google.cloud.dataflow.sdk.options.PipelineOptionsValidator;
 import com.google.cloud.dataflow.sdk.runners.PipelineRunner;
 import com.google.cloud.dataflow.sdk.runners.TransformTreeNode;
+import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
 import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.util.GroupByKeyViaGroupByKeyOnly;
+import com.google.cloud.dataflow.sdk.values.PCollection;
 import com.google.cloud.dataflow.sdk.values.PInput;
 import com.google.cloud.dataflow.sdk.values.POutput;
 import com.google.cloud.dataflow.sdk.values.PValue;
@@ -105,6 +108,23 @@ public final class SparkPipelineRunner extends 
PipelineRunner<EvaluationResult>
   }
 
   /**
+   * Overrides for this runner.
+   */
+  @SuppressWarnings("rawtypes")
+  @Override
+  public <OT extends POutput, IT extends PInput> OT apply(
+      PTransform<IT, OT> transform, IT input) {
+
+    if (transform instanceof GroupByKey) {
+      return (OT) ((PCollection) input).apply(
+          new GroupByKeyViaGroupByKeyOnly((GroupByKey) transform));
+    } else {
+      return super.apply(transform, input);
+    }
+  }
+
+
+  /**
    * No parameter constructor defaults to running this pipeline in Spark's 
local mode, in a single
    * thread.
    */

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ca5b2def/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
index 7f72235..ac59d00 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
@@ -18,11 +18,6 @@
 
 package org.apache.beam.runners.spark.translation;
 
-import static 
org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.getOutputDirectory;
-import static 
org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.getOutputFilePrefix;
-import static 
org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.getOutputFileTemplate;
-import static 
org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.replaceShardCount;
-
 import java.io.IOException;
 import java.lang.reflect.Field;
 import java.util.Arrays;
@@ -30,7 +25,11 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
-import com.google.api.client.util.Lists;
+import static 
org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.getOutputDirectory;
+import static 
org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.getOutputFilePrefix;
+import static 
org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.getOutputFileTemplate;
+import static 
org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.replaceShardCount;
+
 import com.google.api.client.util.Maps;
 import com.google.cloud.dataflow.sdk.coders.CannotProvideCoderException;
 import com.google.cloud.dataflow.sdk.coders.Coder;
@@ -41,7 +40,6 @@ import com.google.cloud.dataflow.sdk.transforms.Combine;
 import com.google.cloud.dataflow.sdk.transforms.Create;
 import com.google.cloud.dataflow.sdk.transforms.DoFn;
 import com.google.cloud.dataflow.sdk.transforms.Flatten;
-import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
 import com.google.cloud.dataflow.sdk.transforms.PTransform;
 import com.google.cloud.dataflow.sdk.transforms.ParDo;
 import com.google.cloud.dataflow.sdk.transforms.View;
@@ -50,6 +48,7 @@ import 
com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindows;
 import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
 import com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn;
 import com.google.cloud.dataflow.sdk.util.AssignWindowsDoFn;
+import 
com.google.cloud.dataflow.sdk.util.GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly;
 import com.google.cloud.dataflow.sdk.util.WindowedValue;
 import com.google.cloud.dataflow.sdk.values.KV;
 import com.google.cloud.dataflow.sdk.values.PCollection;
@@ -82,6 +81,7 @@ import org.apache.spark.api.java.function.Function;
 import org.apache.spark.api.java.function.Function2;
 import org.apache.spark.api.java.function.PairFlatMapFunction;
 import org.apache.spark.api.java.function.PairFunction;
+
 import scala.Tuple2;
 
 /**
@@ -130,10 +130,10 @@ public final class TransformTranslator {
     };
   }
 
-  private static <K, V> TransformEvaluator<GroupByKey.GroupByKeyOnly<K, V>> 
gbk() {
-    return new TransformEvaluator<GroupByKey.GroupByKeyOnly<K, V>>() {
+  private static <K, V> TransformEvaluator<GroupByKeyOnly<K, V>> gbk() {
+    return new TransformEvaluator<GroupByKeyOnly<K, V>>() {
       @Override
-      public void evaluate(GroupByKey.GroupByKeyOnly<K, V> transform, 
EvaluationContext context) {
+      public void evaluate(GroupByKeyOnly<K, V> transform, EvaluationContext 
context) {
         @SuppressWarnings("unchecked")
         JavaRDDLike<WindowedValue<KV<K, V>>, ?> inRDD =
             (JavaRDDLike<WindowedValue<KV<K, V>>, ?>) 
context.getInputRDD(transform);
@@ -776,7 +776,7 @@ public final class TransformTranslator {
     EVALUATORS.put(HadoopIO.Write.Bound.class, writeHadoop());
     EVALUATORS.put(ParDo.Bound.class, parDo());
     EVALUATORS.put(ParDo.BoundMulti.class, multiDo());
-    EVALUATORS.put(GroupByKey.GroupByKeyOnly.class, gbk());
+    EVALUATORS.put(GroupByKeyOnly.class, gbk());
     EVALUATORS.put(Combine.GroupedValues.class, grouped());
     EVALUATORS.put(Combine.Globally.class, combineGlobally());
     EVALUATORS.put(Combine.PerKey.class, combinePerKey());

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ca5b2def/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DirectPipelineRunner.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DirectPipelineRunner.java
 
b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DirectPipelineRunner.java
index 872cfef..417420a 100644
--- 
a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DirectPipelineRunner.java
+++ 
b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DirectPipelineRunner.java
@@ -16,6 +16,7 @@
 
 package com.google.cloud.dataflow.sdk.runners;
 
+import static com.google.cloud.dataflow.sdk.util.CoderUtils.encodeToByteArray;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
 
@@ -24,6 +25,7 @@ import com.google.cloud.dataflow.sdk.Pipeline.PipelineVisitor;
 import com.google.cloud.dataflow.sdk.PipelineResult;
 import com.google.cloud.dataflow.sdk.coders.CannotProvideCoderException;
 import com.google.cloud.dataflow.sdk.coders.Coder;
+import com.google.cloud.dataflow.sdk.coders.CoderException;
 import com.google.cloud.dataflow.sdk.coders.ListCoder;
 import com.google.cloud.dataflow.sdk.io.AvroIO;
 import com.google.cloud.dataflow.sdk.io.FileBasedSink;
@@ -38,12 +40,15 @@ import 
com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
 import com.google.cloud.dataflow.sdk.transforms.Combine;
 import com.google.cloud.dataflow.sdk.transforms.Combine.KeyedCombineFn;
 import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
 import com.google.cloud.dataflow.sdk.transforms.PTransform;
 import com.google.cloud.dataflow.sdk.transforms.ParDo;
 import com.google.cloud.dataflow.sdk.transforms.Partition;
 import com.google.cloud.dataflow.sdk.transforms.Partition.PartitionFn;
 import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
 import com.google.cloud.dataflow.sdk.util.AppliedCombineFn;
+import com.google.cloud.dataflow.sdk.util.GroupByKeyViaGroupByKeyOnly;
+import 
com.google.cloud.dataflow.sdk.util.GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly;
 import com.google.cloud.dataflow.sdk.util.IOChannelUtils;
 import com.google.cloud.dataflow.sdk.util.MapAggregatorValues;
 import com.google.cloud.dataflow.sdk.util.PerKeyCombineFnRunner;
@@ -71,6 +76,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -138,6 +144,8 @@ public class DirectPipelineRunner
     }
   }
 
+  /////////////////////////////////////////////////////////////////////////////
+
   /**
    * Records that instances of the specified PTransform class
    * should be evaluated by the corresponding TransformEvaluator.
@@ -243,6 +251,9 @@ public class DirectPipelineRunner
       return (OutputT) applyTextIOWrite((TextIO.Write.Bound) transform, 
(PCollection<?>) input);
     } else if (transform instanceof AvroIO.Write.Bound) {
       return (OutputT) applyAvroIOWrite((AvroIO.Write.Bound) transform, 
(PCollection<?>) input);
+    } else if (transform instanceof GroupByKey) {
+      return (OutputT)
+          ((PCollection) input).apply(new 
GroupByKeyViaGroupByKeyOnly((GroupByKey) transform));
     } else {
       return super.apply(transform, input);
     }
@@ -1117,6 +1128,39 @@ public class DirectPipelineRunner
 
   /////////////////////////////////////////////////////////////////////////////
 
+  /**
+   * The key by which GBK groups inputs - elements are grouped by the encoded 
form of the key,
+   * but the original key may be accessed as well.
+   */
+  private static class GroupingKey<K> {
+    private K key;
+    private byte[] encodedKey;
+
+    public GroupingKey(K key, byte[] encodedKey) {
+      this.key = key;
+      this.encodedKey = encodedKey;
+    }
+
+    public K getKey() {
+      return key;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (o instanceof GroupingKey) {
+        GroupingKey<?> that = (GroupingKey<?>) o;
+        return Arrays.equals(this.encodedKey, that.encodedKey);
+      } else {
+        return false;
+      }
+    }
+
+    @Override
+    public int hashCode() {
+      return Arrays.hashCode(encodedKey);
+    }
+  }
+
   private final DirectPipelineOptions options;
   private boolean testSerializability;
   private boolean testEncodability;
@@ -1153,4 +1197,76 @@ public class DirectPipelineRunner
   public String toString() {
     return "DirectPipelineRunner#" + hashCode();
   }
+
+  public static <K, V> void evaluateGroupByKeyOnly(
+      GroupByKeyOnly<K, V> transform,
+      EvaluationContext context) {
+    PCollection<KV<K, V>> input = context.getInput(transform);
+
+    List<ValueWithMetadata<KV<K, V>>> inputElems =
+        context.getPCollectionValuesWithMetadata(input);
+
+    Coder<K> keyCoder = GroupByKey.getKeyCoder(input.getCoder());
+
+    Map<GroupingKey<K>, List<V>> groupingMap = new HashMap<>();
+
+    for (ValueWithMetadata<KV<K, V>> elem : inputElems) {
+      K key = elem.getValue().getKey();
+      V value = elem.getValue().getValue();
+      byte[] encodedKey;
+      try {
+        encodedKey = encodeToByteArray(keyCoder, key);
+      } catch (CoderException exn) {
+        // TODO: Put in better element printing:
+        // truncate if too long.
+        throw new IllegalArgumentException(
+            "unable to encode key " + key + " of input to " + transform +
+            " using " + keyCoder,
+            exn);
+      }
+      GroupingKey<K> groupingKey =
+          new GroupingKey<>(key, encodedKey);
+      List<V> values = groupingMap.get(groupingKey);
+      if (values == null) {
+        values = new ArrayList<V>();
+        groupingMap.put(groupingKey, values);
+      }
+      values.add(value);
+    }
+
+    List<ValueWithMetadata<KV<K, Iterable<V>>>> outputElems =
+        new ArrayList<>();
+    for (Map.Entry<GroupingKey<K>, List<V>> entry : groupingMap.entrySet()) {
+      GroupingKey<K> groupingKey = entry.getKey();
+      K key = groupingKey.getKey();
+      List<V> values = entry.getValue();
+      values = context.randomizeIfUnordered(values, true /* inPlaceAllowed */);
+      outputElems.add(ValueWithMetadata
+                      .of(WindowedValue.valueInEmptyWindows(KV.<K, 
Iterable<V>>of(key, values)))
+                      .withKey(key));
+    }
+
+    context.setPCollectionValuesWithMetadata(context.getOutput(transform),
+                                             outputElems);
+  }
+
+  @SuppressWarnings({"rawtypes", "unchecked"})
+  public
+  static <K, V> void registerGroupByKeyOnly() {
+    registerDefaultTransformEvaluator(
+        GroupByKeyOnly.class,
+        new TransformEvaluator<GroupByKeyOnly>() {
+          @Override
+          public void evaluate(
+              GroupByKeyOnly transform,
+              EvaluationContext context) {
+            evaluateGroupByKeyOnly(transform, context);
+          }
+        });
+  }
+
+  static {
+    registerGroupByKeyOnly();
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ca5b2def/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactory.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactory.java
 
b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactory.java
index 3ec4af1..4f97db0 100644
--- 
a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactory.java
+++ 
b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactory.java
@@ -27,11 +27,11 @@ import 
com.google.cloud.dataflow.sdk.runners.inprocess.StepTransformResult.Build
 import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
 import com.google.cloud.dataflow.sdk.transforms.DoFn;
 import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
-import 
com.google.cloud.dataflow.sdk.transforms.GroupByKey.ReifyTimestampsAndWindows;
 import com.google.cloud.dataflow.sdk.transforms.PTransform;
 import com.google.cloud.dataflow.sdk.transforms.ParDo;
 import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
 import com.google.cloud.dataflow.sdk.util.GroupAlsoByWindowViaWindowSetDoFn;
+import 
com.google.cloud.dataflow.sdk.util.GroupByKeyViaGroupByKeyOnly.ReifyTimestampsAndWindows;
 import com.google.cloud.dataflow.sdk.util.KeyedWorkItem;
 import com.google.cloud.dataflow.sdk.util.KeyedWorkItemCoder;
 import com.google.cloud.dataflow.sdk.util.KeyedWorkItems;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ca5b2def/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/GroupByKey.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/GroupByKey.java
 
b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/GroupByKey.java
index 8fde3e0..490269b 100644
--- 
a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/GroupByKey.java
+++ 
b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/GroupByKey.java
@@ -16,40 +16,20 @@
 
 package com.google.cloud.dataflow.sdk.transforms;
 
-import static com.google.cloud.dataflow.sdk.util.CoderUtils.encodeToByteArray;
-
 import com.google.cloud.dataflow.sdk.coders.Coder;
 import com.google.cloud.dataflow.sdk.coders.Coder.NonDeterministicException;
-import com.google.cloud.dataflow.sdk.coders.CoderException;
 import com.google.cloud.dataflow.sdk.coders.IterableCoder;
 import com.google.cloud.dataflow.sdk.coders.KvCoder;
-import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner;
-import 
com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.ValueWithMetadata;
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
 import com.google.cloud.dataflow.sdk.transforms.windowing.DefaultTrigger;
 import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindows;
 import com.google.cloud.dataflow.sdk.transforms.windowing.InvalidWindows;
 import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
 import com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn;
-import 
com.google.cloud.dataflow.sdk.util.GroupAlsoByWindowsViaOutputBufferDoFn;
-import com.google.cloud.dataflow.sdk.util.ReifyTimestampAndWindowsDoFn;
-import com.google.cloud.dataflow.sdk.util.SystemReduceFn;
-import com.google.cloud.dataflow.sdk.util.WindowedValue;
-import com.google.cloud.dataflow.sdk.util.WindowedValue.FullWindowedValueCoder;
-import com.google.cloud.dataflow.sdk.util.WindowedValue.WindowedValueCoder;
 import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
 import com.google.cloud.dataflow.sdk.values.KV;
 import com.google.cloud.dataflow.sdk.values.PCollection;
 import com.google.cloud.dataflow.sdk.values.PCollection.IsBounded;
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
 /**
  * {@code GroupByKey<K, V>} takes a {@code PCollection<KV<K, V>>},
  * groups the values by key and windows, and returns a
@@ -234,34 +214,12 @@ public class GroupByKey<K, V>
 
   @Override
   public PCollection<KV<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) {
-    // This operation groups by the combination of key and window,
+    // This primitive operation groups by the combination of key and window,
     // merging windows as needed, using the windows assigned to the
     // key/value input elements and the window merge operation of the
     // window function associated with the input PCollection.
-    WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();
-
-    // By default, implement GroupByKey[AndWindow] via a series of lower-level
-    // operations.
-    return input
-        // Make each input element's timestamp and assigned windows
-        // explicit, in the value part.
-        .apply(new ReifyTimestampsAndWindows<K, V>())
-
-        // Group by just the key.
-        // Combiner lifting will not happen regardless of the 
disallowCombinerLifting value.
-        // There will be no combiners right after the GroupByKeyOnly because 
of the two ParDos
-        // introduced in here.
-        .apply(new GroupByKeyOnly<K, WindowedValue<V>>())
-
-        // Sort each key's values by timestamp. GroupAlsoByWindow requires
-        // its input to be sorted by timestamp.
-        .apply(new SortValuesByTimestamp<K, V>())
-
-        // Group each key's values by window, merging windows as needed.
-        .apply(new GroupAlsoByWindow<K, V>(windowingStrategy))
-
-        // And update the windowing strategy as appropriate.
-        
.setWindowingStrategyInternal(updateWindowingStrategy(windowingStrategy));
+    return PCollection.createPrimitiveOutputInternal(input.getPipeline(),
+        updateWindowingStrategy(input.getWindowingStrategy()), 
input.isBounded());
   }
 
   @Override
@@ -289,7 +247,7 @@ public class GroupByKey<K, V>
    * transform, which is also used as the {@code Coder} of the keys of
    * the output of this transform.
    */
-  static <K, V> Coder<K> getKeyCoder(Coder<KV<K, V>> inputCoder) {
+  public static <K, V> Coder<K> getKeyCoder(Coder<KV<K, V>> inputCoder) {
     return getInputKvCoder(inputCoder).getKeyCoder();
   }
 
@@ -311,265 +269,7 @@ public class GroupByKey<K, V>
   /**
    * Returns the {@code Coder} of the output of this transform.
    */
-  static <K, V> KvCoder<K, Iterable<V>> getOutputKvCoder(Coder<KV<K, V>> 
inputCoder) {
+  public static <K, V> KvCoder<K, Iterable<V>> getOutputKvCoder(Coder<KV<K, 
V>> inputCoder) {
     return KvCoder.of(getKeyCoder(inputCoder), 
getOutputValueCoder(inputCoder));
   }
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  /**
-   * Helper transform that makes timestamps and window assignments
-   * explicit in the value part of each key/value pair.
-   */
-  public static class ReifyTimestampsAndWindows<K, V>
-      extends PTransform<PCollection<KV<K, V>>,
-                         PCollection<KV<K, WindowedValue<V>>>> {
-    @Override
-    public PCollection<KV<K, WindowedValue<V>>> apply(
-        PCollection<KV<K, V>> input) {
-      @SuppressWarnings("unchecked")
-      KvCoder<K, V> inputKvCoder = (KvCoder<K, V>) input.getCoder();
-      Coder<K> keyCoder = inputKvCoder.getKeyCoder();
-      Coder<V> inputValueCoder = inputKvCoder.getValueCoder();
-      Coder<WindowedValue<V>> outputValueCoder = FullWindowedValueCoder.of(
-          inputValueCoder, 
input.getWindowingStrategy().getWindowFn().windowCoder());
-      Coder<KV<K, WindowedValue<V>>> outputKvCoder =
-          KvCoder.of(keyCoder, outputValueCoder);
-      return input.apply(ParDo.of(new ReifyTimestampAndWindowsDoFn<K, V>()))
-          .setCoder(outputKvCoder);
-    }
-  }
-
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  /**
-   * Helper transform that sorts the values associated with each key
-   * by timestamp.
-   */
-  public static class SortValuesByTimestamp<K, V>
-      extends PTransform<PCollection<KV<K, Iterable<WindowedValue<V>>>>,
-                         PCollection<KV<K, Iterable<WindowedValue<V>>>>> {
-    @Override
-    public PCollection<KV<K, Iterable<WindowedValue<V>>>> apply(
-        PCollection<KV<K, Iterable<WindowedValue<V>>>> input) {
-      return input.apply(ParDo.of(
-          new DoFn<KV<K, Iterable<WindowedValue<V>>>,
-                   KV<K, Iterable<WindowedValue<V>>>>() {
-            @Override
-            public void processElement(ProcessContext c) {
-              KV<K, Iterable<WindowedValue<V>>> kvs = c.element();
-              K key = kvs.getKey();
-              Iterable<WindowedValue<V>> unsortedValues = kvs.getValue();
-              List<WindowedValue<V>> sortedValues = new ArrayList<>();
-              for (WindowedValue<V> value : unsortedValues) {
-                sortedValues.add(value);
-              }
-              Collections.sort(sortedValues,
-                               new Comparator<WindowedValue<V>>() {
-                  @Override
-                  public int compare(WindowedValue<V> e1, WindowedValue<V> e2) 
{
-                    return e1.getTimestamp().compareTo(e2.getTimestamp());
-                  }
-                });
-              c.output(KV.<K, Iterable<WindowedValue<V>>>of(key, 
sortedValues));
-            }}))
-          .setCoder(input.getCoder());
-    }
-  }
-
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  /**
-   * Helper transform that takes a collection of timestamp-ordered
-   * values associated with each key, groups the values by window,
-   * combines windows as needed, and for each window in each key,
-   * outputs a collection of key/value-list pairs implicitly assigned
-   * to the window and with the timestamp derived from that window.
-   */
-  public static class GroupAlsoByWindow<K, V>
-      extends PTransform<PCollection<KV<K, Iterable<WindowedValue<V>>>>,
-                         PCollection<KV<K, Iterable<V>>>> {
-    private final WindowingStrategy<?, ?> windowingStrategy;
-
-    public GroupAlsoByWindow(WindowingStrategy<?, ?> windowingStrategy) {
-      this.windowingStrategy = windowingStrategy;
-    }
-
-    @Override
-    @SuppressWarnings("unchecked")
-    public PCollection<KV<K, Iterable<V>>> apply(
-        PCollection<KV<K, Iterable<WindowedValue<V>>>> input) {
-      @SuppressWarnings("unchecked")
-      KvCoder<K, Iterable<WindowedValue<V>>> inputKvCoder =
-          (KvCoder<K, Iterable<WindowedValue<V>>>) input.getCoder();
-
-      Coder<K> keyCoder = inputKvCoder.getKeyCoder();
-      Coder<Iterable<WindowedValue<V>>> inputValueCoder =
-          inputKvCoder.getValueCoder();
-
-      IterableCoder<WindowedValue<V>> inputIterableValueCoder =
-          (IterableCoder<WindowedValue<V>>) inputValueCoder;
-      Coder<WindowedValue<V>> inputIterableElementCoder =
-          inputIterableValueCoder.getElemCoder();
-      WindowedValueCoder<V> inputIterableWindowedValueCoder =
-          (WindowedValueCoder<V>) inputIterableElementCoder;
-
-      Coder<V> inputIterableElementValueCoder =
-          inputIterableWindowedValueCoder.getValueCoder();
-      Coder<Iterable<V>> outputValueCoder =
-          IterableCoder.of(inputIterableElementValueCoder);
-      Coder<KV<K, Iterable<V>>> outputKvCoder = KvCoder.of(keyCoder, 
outputValueCoder);
-
-      return input
-          .apply(ParDo.of(groupAlsoByWindowsFn(windowingStrategy, 
inputIterableElementValueCoder)))
-          .setCoder(outputKvCoder);
-    }
-
-    private <W extends BoundedWindow> GroupAlsoByWindowsViaOutputBufferDoFn<K, 
V, Iterable<V>, W>
-        groupAlsoByWindowsFn(
-            WindowingStrategy<?, W> strategy, Coder<V> 
inputIterableElementValueCoder) {
-      return new GroupAlsoByWindowsViaOutputBufferDoFn<K, V, Iterable<V>, W>(
-          strategy, SystemReduceFn.<K, V, 
W>buffering(inputIterableElementValueCoder));
-    }
-  }
-
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  /**
-   * Primitive helper transform that groups by key only, ignoring any
-   * window assignments.
-   */
-  public static class GroupByKeyOnly<K, V>
-      extends PTransform<PCollection<KV<K, V>>,
-                         PCollection<KV<K, Iterable<V>>>> {
-
-    @SuppressWarnings({"rawtypes", "unchecked"})
-    @Override
-    public PCollection<KV<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) {
-      return PCollection.<KV<K, Iterable<V>>>createPrimitiveOutputInternal(
-          input.getPipeline(), input.getWindowingStrategy(), 
input.isBounded());
-    }
-
-    /**
-     * Returns the {@code Coder} of the input to this transform, which
-     * should be a {@code KvCoder}.
-     */
-    @SuppressWarnings("unchecked")
-    KvCoder<K, V> getInputKvCoder(Coder<KV<K, V>> inputCoder) {
-      if (!(inputCoder instanceof KvCoder)) {
-        throw new IllegalStateException(
-            "GroupByKey requires its input to use KvCoder");
-      }
-      return (KvCoder<K, V>) inputCoder;
-    }
-
-    @Override
-    protected Coder<KV<K, Iterable<V>>> 
getDefaultOutputCoder(PCollection<KV<K, V>> input) {
-      return GroupByKey.getOutputKvCoder(input.getCoder());
-    }
-  }
-
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  static {
-    registerWithDirectPipelineRunner();
-  }
-
-  @SuppressWarnings({"rawtypes", "unchecked"})
-  private static <K, V> void registerWithDirectPipelineRunner() {
-    DirectPipelineRunner.registerDefaultTransformEvaluator(
-        GroupByKeyOnly.class,
-        new DirectPipelineRunner.TransformEvaluator<GroupByKeyOnly>() {
-          @Override
-          public void evaluate(
-              GroupByKeyOnly transform,
-              DirectPipelineRunner.EvaluationContext context) {
-            evaluateHelper(transform, context);
-          }
-        });
-  }
-
-  private static <K, V> void evaluateHelper(
-      GroupByKeyOnly<K, V> transform,
-      DirectPipelineRunner.EvaluationContext context) {
-    PCollection<KV<K, V>> input = context.getInput(transform);
-
-    List<ValueWithMetadata<KV<K, V>>> inputElems =
-        context.getPCollectionValuesWithMetadata(input);
-
-    Coder<K> keyCoder = GroupByKey.getKeyCoder(input.getCoder());
-
-    Map<GroupingKey<K>, List<V>> groupingMap = new HashMap<>();
-
-    for (ValueWithMetadata<KV<K, V>> elem : inputElems) {
-      K key = elem.getValue().getKey();
-      V value = elem.getValue().getValue();
-      byte[] encodedKey;
-      try {
-        encodedKey = encodeToByteArray(keyCoder, key);
-      } catch (CoderException exn) {
-        // TODO: Put in better element printing:
-        // truncate if too long.
-        throw new IllegalArgumentException(
-            "unable to encode key " + key + " of input to " + transform +
-            " using " + keyCoder,
-            exn);
-      }
-      GroupingKey<K> groupingKey = new GroupingKey<>(key, encodedKey);
-      List<V> values = groupingMap.get(groupingKey);
-      if (values == null) {
-        values = new ArrayList<V>();
-        groupingMap.put(groupingKey, values);
-      }
-      values.add(value);
-    }
-
-    List<ValueWithMetadata<KV<K, Iterable<V>>>> outputElems =
-        new ArrayList<>();
-    for (Map.Entry<GroupingKey<K>, List<V>> entry : groupingMap.entrySet()) {
-      GroupingKey<K> groupingKey = entry.getKey();
-      K key = groupingKey.getKey();
-      List<V> values = entry.getValue();
-      values = context.randomizeIfUnordered(values, true /* inPlaceAllowed */);
-      outputElems.add(ValueWithMetadata
-                      .of(WindowedValue.valueInEmptyWindows(KV.<K, 
Iterable<V>>of(key, values)))
-                      .withKey(key));
-    }
-
-    context.setPCollectionValuesWithMetadata(context.getOutput(transform),
-                                             outputElems);
-  }
-
-  private static class GroupingKey<K> {
-    private K key;
-    private byte[] encodedKey;
-
-    public GroupingKey(K key, byte[] encodedKey) {
-      this.key = key;
-      this.encodedKey = encodedKey;
-    }
-
-    public K getKey() {
-      return key;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (o instanceof GroupingKey) {
-        GroupingKey<?> that = (GroupingKey<?>) o;
-        return Arrays.equals(this.encodedKey, that.encodedKey);
-      } else {
-        return false;
-      }
-    }
-
-    @Override
-    public int hashCode() {
-      return Arrays.hashCode(encodedKey);
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ca5b2def/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/GroupByKeyViaGroupByKeyOnly.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/GroupByKeyViaGroupByKeyOnly.java
 
b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/GroupByKeyViaGroupByKeyOnly.java
new file mode 100644
index 0000000..c331931
--- /dev/null
+++ 
b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/GroupByKeyViaGroupByKeyOnly.java
@@ -0,0 +1,245 @@
+/*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy 
of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package com.google.cloud.dataflow.sdk.util;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.cloud.dataflow.sdk.coders.Coder;
+import com.google.cloud.dataflow.sdk.coders.IterableCoder;
+import com.google.cloud.dataflow.sdk.coders.KvCoder;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
+import com.google.cloud.dataflow.sdk.util.WindowedValue.FullWindowedValueCoder;
+import com.google.cloud.dataflow.sdk.util.WindowedValue.WindowedValueCoder;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+/**
+ * An implementation of {@link GroupByKey} built on top of a lower-level 
{@link GroupByKeyOnly}
+ * primitive.
+ *
+ * <p>This implementation of {@link GroupByKey} proceeds via the following 
steps:
+ * <ol>
+ *   <li>{@code ReifyTimestampsAndWindowsDoFn 
ParDo(ReifyTimestampsAndWindows)}: This embeds
+ *       the previously-implicit timestamp and window into the elements 
themselves, so a
+ *       window-and-timestamp-unaware transform can operate on them.</li>
+ *   <li>{@code GroupByKeyOnly}: This lower-level primitive groups by keys, 
ignoring windows
+ *       and timestamps. Many window-unaware runners have such a primitive 
already.</li>
+ *   <li>{@code SortValuesByTimestamp ParDo(SortValuesByTimestamp)}: The 
values in the iterables
+ *       output by {@link GroupByKeyOnly} are sorted by timestamp.</li>
+ *   <li>{@code GroupAlsoByWindow}: This primitive processes the sorted 
values. Today it is
+ *       implemented as a {@link ParDo} that calls reserved internal 
methods.</li>
+ * </ol>
+ *
+ * <p>This implementation of {@link GroupByKey} has severe limitations unless 
its component
+ * transforms are replaced. As-is, it is only applicable for in-memory runners 
using a batch-style
+ * execution strategy. Specifically:
+ *
+ * <ul>
+ *   <li>Every iterable output by {@link GroupByKeyOnly} must contain all 
elements for that key.
+ *       A streaming-style partition, with multiple elements for the same key, 
will not yield
+ *       correct results.</li>
+ *   <li>Sorting of values by timestamp is performed on an in-memory list. It 
will not succeed
+ *       for large iterables.</li>
+ *   <li>The implementation of {@code GroupAlsoByWindow} does not support 
timers. This is only
+ *       appropriate for runners which also do not support timers.</li>
+ * </ul>
+ */
+public class GroupByKeyViaGroupByKeyOnly<K, V>
+    extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> 
{
+
+  private GroupByKey<K, V> gbkTransform;
+
+  public GroupByKeyViaGroupByKeyOnly(GroupByKey<K, V> originalTransform) {
+    this.gbkTransform = originalTransform;
+  }
+
+  @Override
+  public PCollection<KV<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) {
+    WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();
+
+    return input
+        // Make each input element's timestamp and assigned windows
+        // explicit, in the value part.
+        .apply(new ReifyTimestampsAndWindows<K, V>())
+
+        // Group by just the key.
+        // Combiner lifting will not happen regardless of the 
disallowCombinerLifting value.
+        // There will be no combiners right after the GroupByKeyOnly because 
of the two ParDos
+        // introduced in here.
+        .apply(new GroupByKeyOnly<K, WindowedValue<V>>())
+
+        // Sort each key's values by timestamp. GroupAlsoByWindow requires
+        // its input to be sorted by timestamp.
+        .apply(new SortValuesByTimestamp<K, V>())
+
+        // Group each key's values by window, merging windows as needed.
+        .apply(new GroupAlsoByWindow<K, V>(windowingStrategy))
+
+        // And update the windowing strategy as appropriate.
+        .setWindowingStrategyInternal(
+            gbkTransform.updateWindowingStrategy(windowingStrategy));
+  }
+
+  /**
+   * Runner-specific primitive that groups by key only, ignoring any window 
assignments. A
+   * runner that uses {@link GroupByKeyViaGroupByKeyOnly} should have a 
primitive way to translate
+   * or evaluate this class.
+   */
+  public static class GroupByKeyOnly<K, V>
+      extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, 
Iterable<V>>>> {
+
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    @Override
+    public PCollection<KV<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) {
+      return PCollection.<KV<K, Iterable<V>>>createPrimitiveOutputInternal(
+          input.getPipeline(), input.getWindowingStrategy(), 
input.isBounded());
+    }
+
+    @Override
+    public Coder<KV<K, Iterable<V>>> getDefaultOutputCoder(PCollection<KV<K, 
V>> input) {
+      return GroupByKey.getOutputKvCoder(input.getCoder());
+    }
+  }
+
+  /**
+   * Helper transform that makes timestamps and window assignments
+   * explicit in the value part of each key/value pair.
+   */
+  public static class ReifyTimestampsAndWindows<K, V>
+      extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, 
WindowedValue<V>>>> {
+
+    @Override
+    public PCollection<KV<K, WindowedValue<V>>> apply(PCollection<KV<K, V>> 
input) {
+
+      // The requirement to use a KvCoder *is* actually a model-level 
requirement, not specific
+      // to this implementation of GBK. All runners need a way to get the key.
+      checkArgument(input.getCoder() instanceof KvCoder,
+          "%s requires its input to use a %s",
+          GroupByKey.class.getSimpleName(),
+          KvCoder.class.getSimpleName());
+
+      @SuppressWarnings("unchecked")
+      KvCoder<K, V> inputKvCoder = (KvCoder<K, V>) input.getCoder();
+      Coder<K> keyCoder = inputKvCoder.getKeyCoder();
+      Coder<V> inputValueCoder = inputKvCoder.getValueCoder();
+      Coder<WindowedValue<V>> outputValueCoder =
+          FullWindowedValueCoder.of(
+              inputValueCoder, 
input.getWindowingStrategy().getWindowFn().windowCoder());
+      Coder<KV<K, WindowedValue<V>>> outputKvCoder = KvCoder.of(keyCoder, 
outputValueCoder);
+      return input
+          .apply(ParDo.of(new ReifyTimestampAndWindowsDoFn<K, V>()))
+          .setCoder(outputKvCoder);
+    }
+  }
+
+  /**
+   * Helper transform that sorts the values associated with each key by 
timestamp.
+   */
+  private static class SortValuesByTimestamp<K, V>
+      extends PTransform<
+          PCollection<KV<K, Iterable<WindowedValue<V>>>>,
+          PCollection<KV<K, Iterable<WindowedValue<V>>>>> {
+    @Override
+    public PCollection<KV<K, Iterable<WindowedValue<V>>>> apply(
+        PCollection<KV<K, Iterable<WindowedValue<V>>>> input) {
+      return input
+          .apply(
+              ParDo.of(
+                  new DoFn<KV<K, Iterable<WindowedValue<V>>>, KV<K, 
Iterable<WindowedValue<V>>>>() {
+                    @Override
+                    public void processElement(ProcessContext c) {
+                      KV<K, Iterable<WindowedValue<V>>> kvs = c.element();
+                      K key = kvs.getKey();
+                      Iterable<WindowedValue<V>> unsortedValues = 
kvs.getValue();
+                      List<WindowedValue<V>> sortedValues = new ArrayList<>();
+                      for (WindowedValue<V> value : unsortedValues) {
+                        sortedValues.add(value);
+                      }
+                      Collections.sort(
+                          sortedValues,
+                          new Comparator<WindowedValue<V>>() {
+                            @Override
+                            public int compare(WindowedValue<V> e1, 
WindowedValue<V> e2) {
+                              return 
e1.getTimestamp().compareTo(e2.getTimestamp());
+                            }
+                          });
+                      c.output(KV.<K, Iterable<WindowedValue<V>>>of(key, 
sortedValues));
+                    }
+                  }))
+          .setCoder(input.getCoder());
+    }
+  }
+
+  /**
+   * Helper transform that takes a collection of timestamp-ordered
+   * values associated with each key, groups the values by window,
+   * combines windows as needed, and for each window in each key,
+   * outputs a collection of key/value-list pairs implicitly assigned
+   * to the window and with the timestamp derived from that window.
+   */
+  private static class GroupAlsoByWindow<K, V>
+      extends PTransform<
+          PCollection<KV<K, Iterable<WindowedValue<V>>>>, PCollection<KV<K, 
Iterable<V>>>> {
+    private final WindowingStrategy<?, ?> windowingStrategy;
+
+    public GroupAlsoByWindow(WindowingStrategy<?, ?> windowingStrategy) {
+      this.windowingStrategy = windowingStrategy;
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public PCollection<KV<K, Iterable<V>>> apply(
+        PCollection<KV<K, Iterable<WindowedValue<V>>>> input) {
+      @SuppressWarnings("unchecked")
+      KvCoder<K, Iterable<WindowedValue<V>>> inputKvCoder =
+          (KvCoder<K, Iterable<WindowedValue<V>>>) input.getCoder();
+
+      Coder<K> keyCoder = inputKvCoder.getKeyCoder();
+      Coder<Iterable<WindowedValue<V>>> inputValueCoder = 
inputKvCoder.getValueCoder();
+
+      IterableCoder<WindowedValue<V>> inputIterableValueCoder =
+          (IterableCoder<WindowedValue<V>>) inputValueCoder;
+      Coder<WindowedValue<V>> inputIterableElementCoder = 
inputIterableValueCoder.getElemCoder();
+      WindowedValueCoder<V> inputIterableWindowedValueCoder =
+          (WindowedValueCoder<V>) inputIterableElementCoder;
+
+      Coder<V> inputIterableElementValueCoder = 
inputIterableWindowedValueCoder.getValueCoder();
+      Coder<Iterable<V>> outputValueCoder = 
IterableCoder.of(inputIterableElementValueCoder);
+      Coder<KV<K, Iterable<V>>> outputKvCoder = KvCoder.of(keyCoder, 
outputValueCoder);
+
+      return input
+          .apply(ParDo.of(groupAlsoByWindowsFn(windowingStrategy, 
inputIterableElementValueCoder)))
+          .setCoder(outputKvCoder);
+    }
+
+    private <W extends BoundedWindow>
+        GroupAlsoByWindowsViaOutputBufferDoFn<K, V, Iterable<V>, W> 
groupAlsoByWindowsFn(
+            WindowingStrategy<?, W> strategy, Coder<V> 
inputIterableElementValueCoder) {
+      return new GroupAlsoByWindowsViaOutputBufferDoFn<K, V, Iterable<V>, W>(
+          strategy, SystemReduceFn.<K, V, 
W>buffering(inputIterableElementValueCoder));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ca5b2def/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java
 
b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java
index 560d8ec..483e8c0 100644
--- 
a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java
+++ 
b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java
@@ -18,7 +18,6 @@ package com.google.cloud.dataflow.sdk.util;
 import com.google.cloud.dataflow.sdk.options.PipelineOptions;
 import com.google.cloud.dataflow.sdk.transforms.Aggregator;
 import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.GroupByKey.GroupByKeyOnly;
 import com.google.cloud.dataflow.sdk.transforms.windowing.AfterWatermark;
 import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
 import com.google.cloud.dataflow.sdk.transforms.windowing.OutputTimeFn;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ca5b2def/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactoryTest.java
 
b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactoryTest.java
index 4ced82f..9933ec1 100644
--- 
a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactoryTest.java
+++ 
b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactoryTest.java
@@ -26,7 +26,7 @@ import 
com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.C
 import 
com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
 import com.google.cloud.dataflow.sdk.testing.TestPipeline;
 import com.google.cloud.dataflow.sdk.transforms.Create;
-import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
+import 
com.google.cloud.dataflow.sdk.util.GroupByKeyViaGroupByKeyOnly.ReifyTimestampsAndWindows;
 import com.google.cloud.dataflow.sdk.util.KeyedWorkItem;
 import com.google.cloud.dataflow.sdk.util.KeyedWorkItems;
 import com.google.cloud.dataflow.sdk.util.WindowedValue;
@@ -60,7 +60,7 @@ public class GroupByKeyEvaluatorFactoryTest {
     PCollection<KV<String, Integer>> values =
         p.apply(Create.of(firstFoo, firstBar, secondFoo, firstBaz, secondBar, 
thirdFoo));
     PCollection<KV<String, WindowedValue<Integer>>> kvs =
-        values.apply(new GroupByKey.ReifyTimestampsAndWindows<String, 
Integer>());
+        values.apply(new ReifyTimestampsAndWindows<String, Integer>());
     PCollection<KeyedWorkItem<String, Integer>> groupedKvs =
         kvs.apply(new 
GroupByKeyEvaluatorFactory.InProcessGroupByKeyOnly<String, Integer>());
 

Reply via email to