Repository: incubator-beam Updated Branches: refs/heads/master 6c34f3a34 -> c26eef5be
Move GroupByKey expansion into DirectPipelineRunner Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ca5b2def Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ca5b2def Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ca5b2def Branch: refs/heads/master Commit: ca5b2def2bf77bd841fc670b167fe7ba37801603 Parents: 706fc53 Author: Kenneth Knowles <k...@google.com> Authored: Thu Mar 24 15:26:37 2016 -0700 Committer: Kenneth Knowles <k...@google.com> Committed: Tue Apr 5 13:54:03 2016 -0700 ---------------------------------------------------------------------- .../FlinkBatchTransformTranslators.java | 26 +- .../beam/runners/spark/SparkPipelineRunner.java | 20 ++ .../spark/translation/TransformTranslator.java | 22 +- .../sdk/runners/DirectPipelineRunner.java | 116 +++++++ .../inprocess/GroupByKeyEvaluatorFactory.java | 2 +- .../dataflow/sdk/transforms/GroupByKey.java | 310 +------------------ .../sdk/util/GroupByKeyViaGroupByKeyOnly.java | 245 +++++++++++++++ .../cloud/dataflow/sdk/util/ReduceFnRunner.java | 1 - .../GroupByKeyEvaluatorFactoryTest.java | 4 +- 9 files changed, 404 insertions(+), 342 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ca5b2def/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java index 48c783d..b3c0cea 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java @@ -96,7 +96,7 @@ public class FlinkBatchTransformTranslators { // -------------------------------------------------------------------------------------------- // Transform Translator Registry // -------------------------------------------------------------------------------------------- - + @SuppressWarnings("rawtypes") private static final Map<Class<? extends PTransform>, FlinkBatchPipelineTranslator.BatchTransformTranslator> TRANSLATORS = new HashMap<>(); @@ -112,7 +112,6 @@ public class FlinkBatchTransformTranslators { TRANSLATORS.put(Flatten.FlattenPCollectionList.class, new FlattenPCollectionTranslatorBatch()); - TRANSLATORS.put(GroupByKey.GroupByKeyOnly.class, new GroupByKeyOnlyTranslatorBatch()); // TODO we're currently ignoring windows here but that has to change in the future TRANSLATORS.put(GroupByKey.class, new GroupByKeyTranslatorBatch()); @@ -302,25 +301,8 @@ public class FlinkBatchTransformTranslators { } } - private static class GroupByKeyOnlyTranslatorBatch<K, V> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<GroupByKey.GroupByKeyOnly<K, V>> { - - @Override - public void translateNode(GroupByKey.GroupByKeyOnly<K, V> transform, FlinkBatchTranslationContext context) { - DataSet<KV<K, V>> inputDataSet = context.getInputDataSet(context.getInput(transform)); - GroupReduceFunction<KV<K, V>, KV<K, Iterable<V>>> groupReduceFunction = new FlinkKeyedListAggregationFunction<>(); - - TypeInformation<KV<K, Iterable<V>>> typeInformation = context.getTypeInfo(context.getOutput(transform)); - - Grouping<KV<K, V>> grouping = new UnsortedGrouping<>(inputDataSet, new Keys.ExpressionKeys<>(new String[]{"key"}, inputDataSet.getType())); - - GroupReduceOperator<KV<K, V>, KV<K, Iterable<V>>> outputDataSet = - new GroupReduceOperator<>(grouping, typeInformation, groupReduceFunction, transform.getName()); - context.setOutputDataSet(context.getOutput(transform), outputDataSet); - } - } - /** - * Translates a GroupByKey while ignoring window assignments. This is identical to the {@link GroupByKeyOnlyTranslatorBatch} + * Translates a GroupByKey while ignoring window assignments. Current ignores windows. */ private static class GroupByKeyTranslatorBatch<K, V> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<GroupByKey<K, V>> { @@ -406,7 +388,7 @@ public class FlinkBatchTransformTranslators { // context.setOutputDataSet(transform.getOutput(), outputDataSet); // } // } - + private static class ParDoBoundTranslatorBatch<IN, OUT> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<ParDo.Bound<IN, OUT>> { private static final Logger LOG = LoggerFactory.getLogger(ParDoBoundTranslatorBatch.class); @@ -589,6 +571,6 @@ public class FlinkBatchTransformTranslators { // -------------------------------------------------------------------------------------------- // Miscellaneous // -------------------------------------------------------------------------------------------- - + private FlinkBatchTransformTranslators() {} } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ca5b2def/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java index d5e4186..71e358c 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java @@ -23,7 +23,10 @@ import com.google.cloud.dataflow.sdk.options.PipelineOptions; import com.google.cloud.dataflow.sdk.options.PipelineOptionsValidator; import com.google.cloud.dataflow.sdk.runners.PipelineRunner; import com.google.cloud.dataflow.sdk.runners.TransformTreeNode; +import com.google.cloud.dataflow.sdk.transforms.GroupByKey; import com.google.cloud.dataflow.sdk.transforms.PTransform; +import com.google.cloud.dataflow.sdk.util.GroupByKeyViaGroupByKeyOnly; +import com.google.cloud.dataflow.sdk.values.PCollection; import com.google.cloud.dataflow.sdk.values.PInput; import com.google.cloud.dataflow.sdk.values.POutput; import com.google.cloud.dataflow.sdk.values.PValue; @@ -105,6 +108,23 @@ public final class SparkPipelineRunner extends PipelineRunner<EvaluationResult> } /** + * Overrides for this runner. + */ + @SuppressWarnings("rawtypes") + @Override + public <OT extends POutput, IT extends PInput> OT apply( + PTransform<IT, OT> transform, IT input) { + + if (transform instanceof GroupByKey) { + return (OT) ((PCollection) input).apply( + new GroupByKeyViaGroupByKeyOnly((GroupByKey) transform)); + } else { + return super.apply(transform, input); + } + } + + + /** * No parameter constructor defaults to running this pipeline in Spark's local mode, in a single * thread. */ http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ca5b2def/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java index 7f72235..ac59d00 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java @@ -18,11 +18,6 @@ package org.apache.beam.runners.spark.translation; -import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.getOutputDirectory; -import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.getOutputFilePrefix; -import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.getOutputFileTemplate; -import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.replaceShardCount; - import java.io.IOException; import java.lang.reflect.Field; import java.util.Arrays; @@ -30,7 +25,11 @@ import java.util.Collections; import java.util.List; import java.util.Map; -import com.google.api.client.util.Lists; +import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.getOutputDirectory; +import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.getOutputFilePrefix; +import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.getOutputFileTemplate; +import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.replaceShardCount; + import com.google.api.client.util.Maps; import com.google.cloud.dataflow.sdk.coders.CannotProvideCoderException; import com.google.cloud.dataflow.sdk.coders.Coder; @@ -41,7 +40,6 @@ import com.google.cloud.dataflow.sdk.transforms.Combine; import com.google.cloud.dataflow.sdk.transforms.Create; import com.google.cloud.dataflow.sdk.transforms.DoFn; import com.google.cloud.dataflow.sdk.transforms.Flatten; -import com.google.cloud.dataflow.sdk.transforms.GroupByKey; import com.google.cloud.dataflow.sdk.transforms.PTransform; import com.google.cloud.dataflow.sdk.transforms.ParDo; import com.google.cloud.dataflow.sdk.transforms.View; @@ -50,6 +48,7 @@ import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindows; import com.google.cloud.dataflow.sdk.transforms.windowing.Window; import com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn; import com.google.cloud.dataflow.sdk.util.AssignWindowsDoFn; +import com.google.cloud.dataflow.sdk.util.GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly; import com.google.cloud.dataflow.sdk.util.WindowedValue; import com.google.cloud.dataflow.sdk.values.KV; import com.google.cloud.dataflow.sdk.values.PCollection; @@ -82,6 +81,7 @@ import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFlatMapFunction; import org.apache.spark.api.java.function.PairFunction; + import scala.Tuple2; /** @@ -130,10 +130,10 @@ public final class TransformTranslator { }; } - private static <K, V> TransformEvaluator<GroupByKey.GroupByKeyOnly<K, V>> gbk() { - return new TransformEvaluator<GroupByKey.GroupByKeyOnly<K, V>>() { + private static <K, V> TransformEvaluator<GroupByKeyOnly<K, V>> gbk() { + return new TransformEvaluator<GroupByKeyOnly<K, V>>() { @Override - public void evaluate(GroupByKey.GroupByKeyOnly<K, V> transform, EvaluationContext context) { + public void evaluate(GroupByKeyOnly<K, V> transform, EvaluationContext context) { @SuppressWarnings("unchecked") JavaRDDLike<WindowedValue<KV<K, V>>, ?> inRDD = (JavaRDDLike<WindowedValue<KV<K, V>>, ?>) context.getInputRDD(transform); @@ -776,7 +776,7 @@ public final class TransformTranslator { EVALUATORS.put(HadoopIO.Write.Bound.class, writeHadoop()); EVALUATORS.put(ParDo.Bound.class, parDo()); EVALUATORS.put(ParDo.BoundMulti.class, multiDo()); - EVALUATORS.put(GroupByKey.GroupByKeyOnly.class, gbk()); + EVALUATORS.put(GroupByKeyOnly.class, gbk()); EVALUATORS.put(Combine.GroupedValues.class, grouped()); EVALUATORS.put(Combine.Globally.class, combineGlobally()); EVALUATORS.put(Combine.PerKey.class, combinePerKey()); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ca5b2def/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DirectPipelineRunner.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DirectPipelineRunner.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DirectPipelineRunner.java index 872cfef..417420a 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DirectPipelineRunner.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DirectPipelineRunner.java @@ -16,6 +16,7 @@ package com.google.cloud.dataflow.sdk.runners; +import static com.google.cloud.dataflow.sdk.util.CoderUtils.encodeToByteArray; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; @@ -24,6 +25,7 @@ import com.google.cloud.dataflow.sdk.Pipeline.PipelineVisitor; import com.google.cloud.dataflow.sdk.PipelineResult; import com.google.cloud.dataflow.sdk.coders.CannotProvideCoderException; import com.google.cloud.dataflow.sdk.coders.Coder; +import com.google.cloud.dataflow.sdk.coders.CoderException; import com.google.cloud.dataflow.sdk.coders.ListCoder; import com.google.cloud.dataflow.sdk.io.AvroIO; import com.google.cloud.dataflow.sdk.io.FileBasedSink; @@ -38,12 +40,15 @@ import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; import com.google.cloud.dataflow.sdk.transforms.Combine; import com.google.cloud.dataflow.sdk.transforms.Combine.KeyedCombineFn; import com.google.cloud.dataflow.sdk.transforms.DoFn; +import com.google.cloud.dataflow.sdk.transforms.GroupByKey; import com.google.cloud.dataflow.sdk.transforms.PTransform; import com.google.cloud.dataflow.sdk.transforms.ParDo; import com.google.cloud.dataflow.sdk.transforms.Partition; import com.google.cloud.dataflow.sdk.transforms.Partition.PartitionFn; import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; import com.google.cloud.dataflow.sdk.util.AppliedCombineFn; +import com.google.cloud.dataflow.sdk.util.GroupByKeyViaGroupByKeyOnly; +import com.google.cloud.dataflow.sdk.util.GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly; import com.google.cloud.dataflow.sdk.util.IOChannelUtils; import com.google.cloud.dataflow.sdk.util.MapAggregatorValues; import com.google.cloud.dataflow.sdk.util.PerKeyCombineFnRunner; @@ -71,6 +76,7 @@ import org.slf4j.LoggerFactory; import java.io.Serializable; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -138,6 +144,8 @@ public class DirectPipelineRunner } } + ///////////////////////////////////////////////////////////////////////////// + /** * Records that instances of the specified PTransform class * should be evaluated by the corresponding TransformEvaluator. @@ -243,6 +251,9 @@ public class DirectPipelineRunner return (OutputT) applyTextIOWrite((TextIO.Write.Bound) transform, (PCollection<?>) input); } else if (transform instanceof AvroIO.Write.Bound) { return (OutputT) applyAvroIOWrite((AvroIO.Write.Bound) transform, (PCollection<?>) input); + } else if (transform instanceof GroupByKey) { + return (OutputT) + ((PCollection) input).apply(new GroupByKeyViaGroupByKeyOnly((GroupByKey) transform)); } else { return super.apply(transform, input); } @@ -1117,6 +1128,39 @@ public class DirectPipelineRunner ///////////////////////////////////////////////////////////////////////////// + /** + * The key by which GBK groups inputs - elements are grouped by the encoded form of the key, + * but the original key may be accessed as well. + */ + private static class GroupingKey<K> { + private K key; + private byte[] encodedKey; + + public GroupingKey(K key, byte[] encodedKey) { + this.key = key; + this.encodedKey = encodedKey; + } + + public K getKey() { + return key; + } + + @Override + public boolean equals(Object o) { + if (o instanceof GroupingKey) { + GroupingKey<?> that = (GroupingKey<?>) o; + return Arrays.equals(this.encodedKey, that.encodedKey); + } else { + return false; + } + } + + @Override + public int hashCode() { + return Arrays.hashCode(encodedKey); + } + } + private final DirectPipelineOptions options; private boolean testSerializability; private boolean testEncodability; @@ -1153,4 +1197,76 @@ public class DirectPipelineRunner public String toString() { return "DirectPipelineRunner#" + hashCode(); } + + public static <K, V> void evaluateGroupByKeyOnly( + GroupByKeyOnly<K, V> transform, + EvaluationContext context) { + PCollection<KV<K, V>> input = context.getInput(transform); + + List<ValueWithMetadata<KV<K, V>>> inputElems = + context.getPCollectionValuesWithMetadata(input); + + Coder<K> keyCoder = GroupByKey.getKeyCoder(input.getCoder()); + + Map<GroupingKey<K>, List<V>> groupingMap = new HashMap<>(); + + for (ValueWithMetadata<KV<K, V>> elem : inputElems) { + K key = elem.getValue().getKey(); + V value = elem.getValue().getValue(); + byte[] encodedKey; + try { + encodedKey = encodeToByteArray(keyCoder, key); + } catch (CoderException exn) { + // TODO: Put in better element printing: + // truncate if too long. + throw new IllegalArgumentException( + "unable to encode key " + key + " of input to " + transform + + " using " + keyCoder, + exn); + } + GroupingKey<K> groupingKey = + new GroupingKey<>(key, encodedKey); + List<V> values = groupingMap.get(groupingKey); + if (values == null) { + values = new ArrayList<V>(); + groupingMap.put(groupingKey, values); + } + values.add(value); + } + + List<ValueWithMetadata<KV<K, Iterable<V>>>> outputElems = + new ArrayList<>(); + for (Map.Entry<GroupingKey<K>, List<V>> entry : groupingMap.entrySet()) { + GroupingKey<K> groupingKey = entry.getKey(); + K key = groupingKey.getKey(); + List<V> values = entry.getValue(); + values = context.randomizeIfUnordered(values, true /* inPlaceAllowed */); + outputElems.add(ValueWithMetadata + .of(WindowedValue.valueInEmptyWindows(KV.<K, Iterable<V>>of(key, values))) + .withKey(key)); + } + + context.setPCollectionValuesWithMetadata(context.getOutput(transform), + outputElems); + } + + @SuppressWarnings({"rawtypes", "unchecked"}) + public + static <K, V> void registerGroupByKeyOnly() { + registerDefaultTransformEvaluator( + GroupByKeyOnly.class, + new TransformEvaluator<GroupByKeyOnly>() { + @Override + public void evaluate( + GroupByKeyOnly transform, + EvaluationContext context) { + evaluateGroupByKeyOnly(transform, context); + } + }); + } + + static { + registerGroupByKeyOnly(); + } + } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ca5b2def/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactory.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactory.java index 3ec4af1..4f97db0 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactory.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactory.java @@ -27,11 +27,11 @@ import com.google.cloud.dataflow.sdk.runners.inprocess.StepTransformResult.Build import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; import com.google.cloud.dataflow.sdk.transforms.DoFn; import com.google.cloud.dataflow.sdk.transforms.GroupByKey; -import com.google.cloud.dataflow.sdk.transforms.GroupByKey.ReifyTimestampsAndWindows; import com.google.cloud.dataflow.sdk.transforms.PTransform; import com.google.cloud.dataflow.sdk.transforms.ParDo; import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; import com.google.cloud.dataflow.sdk.util.GroupAlsoByWindowViaWindowSetDoFn; +import com.google.cloud.dataflow.sdk.util.GroupByKeyViaGroupByKeyOnly.ReifyTimestampsAndWindows; import com.google.cloud.dataflow.sdk.util.KeyedWorkItem; import com.google.cloud.dataflow.sdk.util.KeyedWorkItemCoder; import com.google.cloud.dataflow.sdk.util.KeyedWorkItems; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ca5b2def/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/GroupByKey.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/GroupByKey.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/GroupByKey.java index 8fde3e0..490269b 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/GroupByKey.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/GroupByKey.java @@ -16,40 +16,20 @@ package com.google.cloud.dataflow.sdk.transforms; -import static com.google.cloud.dataflow.sdk.util.CoderUtils.encodeToByteArray; - import com.google.cloud.dataflow.sdk.coders.Coder; import com.google.cloud.dataflow.sdk.coders.Coder.NonDeterministicException; -import com.google.cloud.dataflow.sdk.coders.CoderException; import com.google.cloud.dataflow.sdk.coders.IterableCoder; import com.google.cloud.dataflow.sdk.coders.KvCoder; -import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner; -import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.ValueWithMetadata; -import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; import com.google.cloud.dataflow.sdk.transforms.windowing.DefaultTrigger; import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindows; import com.google.cloud.dataflow.sdk.transforms.windowing.InvalidWindows; import com.google.cloud.dataflow.sdk.transforms.windowing.Window; import com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn; -import com.google.cloud.dataflow.sdk.util.GroupAlsoByWindowsViaOutputBufferDoFn; -import com.google.cloud.dataflow.sdk.util.ReifyTimestampAndWindowsDoFn; -import com.google.cloud.dataflow.sdk.util.SystemReduceFn; -import com.google.cloud.dataflow.sdk.util.WindowedValue; -import com.google.cloud.dataflow.sdk.util.WindowedValue.FullWindowedValueCoder; -import com.google.cloud.dataflow.sdk.util.WindowedValue.WindowedValueCoder; import com.google.cloud.dataflow.sdk.util.WindowingStrategy; import com.google.cloud.dataflow.sdk.values.KV; import com.google.cloud.dataflow.sdk.values.PCollection; import com.google.cloud.dataflow.sdk.values.PCollection.IsBounded; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - /** * {@code GroupByKey<K, V>} takes a {@code PCollection<KV<K, V>>}, * groups the values by key and windows, and returns a @@ -234,34 +214,12 @@ public class GroupByKey<K, V> @Override public PCollection<KV<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) { - // This operation groups by the combination of key and window, + // This primitive operation groups by the combination of key and window, // merging windows as needed, using the windows assigned to the // key/value input elements and the window merge operation of the // window function associated with the input PCollection. - WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy(); - - // By default, implement GroupByKey[AndWindow] via a series of lower-level - // operations. - return input - // Make each input element's timestamp and assigned windows - // explicit, in the value part. - .apply(new ReifyTimestampsAndWindows<K, V>()) - - // Group by just the key. - // Combiner lifting will not happen regardless of the disallowCombinerLifting value. - // There will be no combiners right after the GroupByKeyOnly because of the two ParDos - // introduced in here. - .apply(new GroupByKeyOnly<K, WindowedValue<V>>()) - - // Sort each key's values by timestamp. GroupAlsoByWindow requires - // its input to be sorted by timestamp. - .apply(new SortValuesByTimestamp<K, V>()) - - // Group each key's values by window, merging windows as needed. - .apply(new GroupAlsoByWindow<K, V>(windowingStrategy)) - - // And update the windowing strategy as appropriate. - .setWindowingStrategyInternal(updateWindowingStrategy(windowingStrategy)); + return PCollection.createPrimitiveOutputInternal(input.getPipeline(), + updateWindowingStrategy(input.getWindowingStrategy()), input.isBounded()); } @Override @@ -289,7 +247,7 @@ public class GroupByKey<K, V> * transform, which is also used as the {@code Coder} of the keys of * the output of this transform. */ - static <K, V> Coder<K> getKeyCoder(Coder<KV<K, V>> inputCoder) { + public static <K, V> Coder<K> getKeyCoder(Coder<KV<K, V>> inputCoder) { return getInputKvCoder(inputCoder).getKeyCoder(); } @@ -311,265 +269,7 @@ public class GroupByKey<K, V> /** * Returns the {@code Coder} of the output of this transform. */ - static <K, V> KvCoder<K, Iterable<V>> getOutputKvCoder(Coder<KV<K, V>> inputCoder) { + public static <K, V> KvCoder<K, Iterable<V>> getOutputKvCoder(Coder<KV<K, V>> inputCoder) { return KvCoder.of(getKeyCoder(inputCoder), getOutputValueCoder(inputCoder)); } - - ///////////////////////////////////////////////////////////////////////////// - - /** - * Helper transform that makes timestamps and window assignments - * explicit in the value part of each key/value pair. - */ - public static class ReifyTimestampsAndWindows<K, V> - extends PTransform<PCollection<KV<K, V>>, - PCollection<KV<K, WindowedValue<V>>>> { - @Override - public PCollection<KV<K, WindowedValue<V>>> apply( - PCollection<KV<K, V>> input) { - @SuppressWarnings("unchecked") - KvCoder<K, V> inputKvCoder = (KvCoder<K, V>) input.getCoder(); - Coder<K> keyCoder = inputKvCoder.getKeyCoder(); - Coder<V> inputValueCoder = inputKvCoder.getValueCoder(); - Coder<WindowedValue<V>> outputValueCoder = FullWindowedValueCoder.of( - inputValueCoder, input.getWindowingStrategy().getWindowFn().windowCoder()); - Coder<KV<K, WindowedValue<V>>> outputKvCoder = - KvCoder.of(keyCoder, outputValueCoder); - return input.apply(ParDo.of(new ReifyTimestampAndWindowsDoFn<K, V>())) - .setCoder(outputKvCoder); - } - } - - - ///////////////////////////////////////////////////////////////////////////// - - /** - * Helper transform that sorts the values associated with each key - * by timestamp. - */ - public static class SortValuesByTimestamp<K, V> - extends PTransform<PCollection<KV<K, Iterable<WindowedValue<V>>>>, - PCollection<KV<K, Iterable<WindowedValue<V>>>>> { - @Override - public PCollection<KV<K, Iterable<WindowedValue<V>>>> apply( - PCollection<KV<K, Iterable<WindowedValue<V>>>> input) { - return input.apply(ParDo.of( - new DoFn<KV<K, Iterable<WindowedValue<V>>>, - KV<K, Iterable<WindowedValue<V>>>>() { - @Override - public void processElement(ProcessContext c) { - KV<K, Iterable<WindowedValue<V>>> kvs = c.element(); - K key = kvs.getKey(); - Iterable<WindowedValue<V>> unsortedValues = kvs.getValue(); - List<WindowedValue<V>> sortedValues = new ArrayList<>(); - for (WindowedValue<V> value : unsortedValues) { - sortedValues.add(value); - } - Collections.sort(sortedValues, - new Comparator<WindowedValue<V>>() { - @Override - public int compare(WindowedValue<V> e1, WindowedValue<V> e2) { - return e1.getTimestamp().compareTo(e2.getTimestamp()); - } - }); - c.output(KV.<K, Iterable<WindowedValue<V>>>of(key, sortedValues)); - }})) - .setCoder(input.getCoder()); - } - } - - - ///////////////////////////////////////////////////////////////////////////// - - /** - * Helper transform that takes a collection of timestamp-ordered - * values associated with each key, groups the values by window, - * combines windows as needed, and for each window in each key, - * outputs a collection of key/value-list pairs implicitly assigned - * to the window and with the timestamp derived from that window. - */ - public static class GroupAlsoByWindow<K, V> - extends PTransform<PCollection<KV<K, Iterable<WindowedValue<V>>>>, - PCollection<KV<K, Iterable<V>>>> { - private final WindowingStrategy<?, ?> windowingStrategy; - - public GroupAlsoByWindow(WindowingStrategy<?, ?> windowingStrategy) { - this.windowingStrategy = windowingStrategy; - } - - @Override - @SuppressWarnings("unchecked") - public PCollection<KV<K, Iterable<V>>> apply( - PCollection<KV<K, Iterable<WindowedValue<V>>>> input) { - @SuppressWarnings("unchecked") - KvCoder<K, Iterable<WindowedValue<V>>> inputKvCoder = - (KvCoder<K, Iterable<WindowedValue<V>>>) input.getCoder(); - - Coder<K> keyCoder = inputKvCoder.getKeyCoder(); - Coder<Iterable<WindowedValue<V>>> inputValueCoder = - inputKvCoder.getValueCoder(); - - IterableCoder<WindowedValue<V>> inputIterableValueCoder = - (IterableCoder<WindowedValue<V>>) inputValueCoder; - Coder<WindowedValue<V>> inputIterableElementCoder = - inputIterableValueCoder.getElemCoder(); - WindowedValueCoder<V> inputIterableWindowedValueCoder = - (WindowedValueCoder<V>) inputIterableElementCoder; - - Coder<V> inputIterableElementValueCoder = - inputIterableWindowedValueCoder.getValueCoder(); - Coder<Iterable<V>> outputValueCoder = - IterableCoder.of(inputIterableElementValueCoder); - Coder<KV<K, Iterable<V>>> outputKvCoder = KvCoder.of(keyCoder, outputValueCoder); - - return input - .apply(ParDo.of(groupAlsoByWindowsFn(windowingStrategy, inputIterableElementValueCoder))) - .setCoder(outputKvCoder); - } - - private <W extends BoundedWindow> GroupAlsoByWindowsViaOutputBufferDoFn<K, V, Iterable<V>, W> - groupAlsoByWindowsFn( - WindowingStrategy<?, W> strategy, Coder<V> inputIterableElementValueCoder) { - return new GroupAlsoByWindowsViaOutputBufferDoFn<K, V, Iterable<V>, W>( - strategy, SystemReduceFn.<K, V, W>buffering(inputIterableElementValueCoder)); - } - } - - - ///////////////////////////////////////////////////////////////////////////// - - /** - * Primitive helper transform that groups by key only, ignoring any - * window assignments. - */ - public static class GroupByKeyOnly<K, V> - extends PTransform<PCollection<KV<K, V>>, - PCollection<KV<K, Iterable<V>>>> { - - @SuppressWarnings({"rawtypes", "unchecked"}) - @Override - public PCollection<KV<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) { - return PCollection.<KV<K, Iterable<V>>>createPrimitiveOutputInternal( - input.getPipeline(), input.getWindowingStrategy(), input.isBounded()); - } - - /** - * Returns the {@code Coder} of the input to this transform, which - * should be a {@code KvCoder}. - */ - @SuppressWarnings("unchecked") - KvCoder<K, V> getInputKvCoder(Coder<KV<K, V>> inputCoder) { - if (!(inputCoder instanceof KvCoder)) { - throw new IllegalStateException( - "GroupByKey requires its input to use KvCoder"); - } - return (KvCoder<K, V>) inputCoder; - } - - @Override - protected Coder<KV<K, Iterable<V>>> getDefaultOutputCoder(PCollection<KV<K, V>> input) { - return GroupByKey.getOutputKvCoder(input.getCoder()); - } - } - - - ///////////////////////////////////////////////////////////////////////////// - - static { - registerWithDirectPipelineRunner(); - } - - @SuppressWarnings({"rawtypes", "unchecked"}) - private static <K, V> void registerWithDirectPipelineRunner() { - DirectPipelineRunner.registerDefaultTransformEvaluator( - GroupByKeyOnly.class, - new DirectPipelineRunner.TransformEvaluator<GroupByKeyOnly>() { - @Override - public void evaluate( - GroupByKeyOnly transform, - DirectPipelineRunner.EvaluationContext context) { - evaluateHelper(transform, context); - } - }); - } - - private static <K, V> void evaluateHelper( - GroupByKeyOnly<K, V> transform, - DirectPipelineRunner.EvaluationContext context) { - PCollection<KV<K, V>> input = context.getInput(transform); - - List<ValueWithMetadata<KV<K, V>>> inputElems = - context.getPCollectionValuesWithMetadata(input); - - Coder<K> keyCoder = GroupByKey.getKeyCoder(input.getCoder()); - - Map<GroupingKey<K>, List<V>> groupingMap = new HashMap<>(); - - for (ValueWithMetadata<KV<K, V>> elem : inputElems) { - K key = elem.getValue().getKey(); - V value = elem.getValue().getValue(); - byte[] encodedKey; - try { - encodedKey = encodeToByteArray(keyCoder, key); - } catch (CoderException exn) { - // TODO: Put in better element printing: - // truncate if too long. - throw new IllegalArgumentException( - "unable to encode key " + key + " of input to " + transform + - " using " + keyCoder, - exn); - } - GroupingKey<K> groupingKey = new GroupingKey<>(key, encodedKey); - List<V> values = groupingMap.get(groupingKey); - if (values == null) { - values = new ArrayList<V>(); - groupingMap.put(groupingKey, values); - } - values.add(value); - } - - List<ValueWithMetadata<KV<K, Iterable<V>>>> outputElems = - new ArrayList<>(); - for (Map.Entry<GroupingKey<K>, List<V>> entry : groupingMap.entrySet()) { - GroupingKey<K> groupingKey = entry.getKey(); - K key = groupingKey.getKey(); - List<V> values = entry.getValue(); - values = context.randomizeIfUnordered(values, true /* inPlaceAllowed */); - outputElems.add(ValueWithMetadata - .of(WindowedValue.valueInEmptyWindows(KV.<K, Iterable<V>>of(key, values))) - .withKey(key)); - } - - context.setPCollectionValuesWithMetadata(context.getOutput(transform), - outputElems); - } - - private static class GroupingKey<K> { - private K key; - private byte[] encodedKey; - - public GroupingKey(K key, byte[] encodedKey) { - this.key = key; - this.encodedKey = encodedKey; - } - - public K getKey() { - return key; - } - - @Override - public boolean equals(Object o) { - if (o instanceof GroupingKey) { - GroupingKey<?> that = (GroupingKey<?>) o; - return Arrays.equals(this.encodedKey, that.encodedKey); - } else { - return false; - } - } - - @Override - public int hashCode() { - return Arrays.hashCode(encodedKey); - } - } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ca5b2def/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/GroupByKeyViaGroupByKeyOnly.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/GroupByKeyViaGroupByKeyOnly.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/GroupByKeyViaGroupByKeyOnly.java new file mode 100644 index 0000000..c331931 --- /dev/null +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/GroupByKeyViaGroupByKeyOnly.java @@ -0,0 +1,245 @@ +/* + * Copyright (C) 2015 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package com.google.cloud.dataflow.sdk.util; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.cloud.dataflow.sdk.coders.Coder; +import com.google.cloud.dataflow.sdk.coders.IterableCoder; +import com.google.cloud.dataflow.sdk.coders.KvCoder; +import com.google.cloud.dataflow.sdk.transforms.DoFn; +import com.google.cloud.dataflow.sdk.transforms.GroupByKey; +import com.google.cloud.dataflow.sdk.transforms.PTransform; +import com.google.cloud.dataflow.sdk.transforms.ParDo; +import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; +import com.google.cloud.dataflow.sdk.util.WindowedValue.FullWindowedValueCoder; +import com.google.cloud.dataflow.sdk.util.WindowedValue.WindowedValueCoder; +import com.google.cloud.dataflow.sdk.values.KV; +import com.google.cloud.dataflow.sdk.values.PCollection; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; + +/** + * An implementation of {@link GroupByKey} built on top of a lower-level {@link GroupByKeyOnly} + * primitive. + * + * <p>This implementation of {@link GroupByKey} proceeds via the following steps: + * <ol> + * <li>{@code ReifyTimestampsAndWindowsDoFn ParDo(ReifyTimestampsAndWindows)}: This embeds + * the previously-implicit timestamp and window into the elements themselves, so a + * window-and-timestamp-unaware transform can operate on them.</li> + * <li>{@code GroupByKeyOnly}: This lower-level primitive groups by keys, ignoring windows + * and timestamps. Many window-unaware runners have such a primitive already.</li> + * <li>{@code SortValuesByTimestamp ParDo(SortValuesByTimestamp)}: The values in the iterables + * output by {@link GroupByKeyOnly} are sorted by timestamp.</li> + * <li>{@code GroupAlsoByWindow}: This primitive processes the sorted values. Today it is + * implemented as a {@link ParDo} that calls reserved internal methods.</li> + * </ol> + * + * <p>This implementation of {@link GroupByKey} has severe limitations unless its component + * transforms are replaced. As-is, it is only applicable for in-memory runners using a batch-style + * execution strategy. Specifically: + * + * <ul> + * <li>Every iterable output by {@link GroupByKeyOnly} must contain all elements for that key. + * A streaming-style partition, with multiple elements for the same key, will not yield + * correct results.</li> + * <li>Sorting of values by timestamp is performed on an in-memory list. It will not succeed + * for large iterables.</li> + * <li>The implementation of {@code GroupAlsoByWindow} does not support timers. This is only + * appropriate for runners which also do not support timers.</li> + * </ul> + */ +public class GroupByKeyViaGroupByKeyOnly<K, V> + extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> { + + private GroupByKey<K, V> gbkTransform; + + public GroupByKeyViaGroupByKeyOnly(GroupByKey<K, V> originalTransform) { + this.gbkTransform = originalTransform; + } + + @Override + public PCollection<KV<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) { + WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy(); + + return input + // Make each input element's timestamp and assigned windows + // explicit, in the value part. + .apply(new ReifyTimestampsAndWindows<K, V>()) + + // Group by just the key. + // Combiner lifting will not happen regardless of the disallowCombinerLifting value. + // There will be no combiners right after the GroupByKeyOnly because of the two ParDos + // introduced in here. + .apply(new GroupByKeyOnly<K, WindowedValue<V>>()) + + // Sort each key's values by timestamp. GroupAlsoByWindow requires + // its input to be sorted by timestamp. + .apply(new SortValuesByTimestamp<K, V>()) + + // Group each key's values by window, merging windows as needed. + .apply(new GroupAlsoByWindow<K, V>(windowingStrategy)) + + // And update the windowing strategy as appropriate. + .setWindowingStrategyInternal( + gbkTransform.updateWindowingStrategy(windowingStrategy)); + } + + /** + * Runner-specific primitive that groups by key only, ignoring any window assignments. A + * runner that uses {@link GroupByKeyViaGroupByKeyOnly} should have a primitive way to translate + * or evaluate this class. + */ + public static class GroupByKeyOnly<K, V> + extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> { + + @SuppressWarnings({"rawtypes", "unchecked"}) + @Override + public PCollection<KV<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) { + return PCollection.<KV<K, Iterable<V>>>createPrimitiveOutputInternal( + input.getPipeline(), input.getWindowingStrategy(), input.isBounded()); + } + + @Override + public Coder<KV<K, Iterable<V>>> getDefaultOutputCoder(PCollection<KV<K, V>> input) { + return GroupByKey.getOutputKvCoder(input.getCoder()); + } + } + + /** + * Helper transform that makes timestamps and window assignments + * explicit in the value part of each key/value pair. + */ + public static class ReifyTimestampsAndWindows<K, V> + extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, WindowedValue<V>>>> { + + @Override + public PCollection<KV<K, WindowedValue<V>>> apply(PCollection<KV<K, V>> input) { + + // The requirement to use a KvCoder *is* actually a model-level requirement, not specific + // to this implementation of GBK. All runners need a way to get the key. + checkArgument(input.getCoder() instanceof KvCoder, + "%s requires its input to use a %s", + GroupByKey.class.getSimpleName(), + KvCoder.class.getSimpleName()); + + @SuppressWarnings("unchecked") + KvCoder<K, V> inputKvCoder = (KvCoder<K, V>) input.getCoder(); + Coder<K> keyCoder = inputKvCoder.getKeyCoder(); + Coder<V> inputValueCoder = inputKvCoder.getValueCoder(); + Coder<WindowedValue<V>> outputValueCoder = + FullWindowedValueCoder.of( + inputValueCoder, input.getWindowingStrategy().getWindowFn().windowCoder()); + Coder<KV<K, WindowedValue<V>>> outputKvCoder = KvCoder.of(keyCoder, outputValueCoder); + return input + .apply(ParDo.of(new ReifyTimestampAndWindowsDoFn<K, V>())) + .setCoder(outputKvCoder); + } + } + + /** + * Helper transform that sorts the values associated with each key by timestamp. + */ + private static class SortValuesByTimestamp<K, V> + extends PTransform< + PCollection<KV<K, Iterable<WindowedValue<V>>>>, + PCollection<KV<K, Iterable<WindowedValue<V>>>>> { + @Override + public PCollection<KV<K, Iterable<WindowedValue<V>>>> apply( + PCollection<KV<K, Iterable<WindowedValue<V>>>> input) { + return input + .apply( + ParDo.of( + new DoFn<KV<K, Iterable<WindowedValue<V>>>, KV<K, Iterable<WindowedValue<V>>>>() { + @Override + public void processElement(ProcessContext c) { + KV<K, Iterable<WindowedValue<V>>> kvs = c.element(); + K key = kvs.getKey(); + Iterable<WindowedValue<V>> unsortedValues = kvs.getValue(); + List<WindowedValue<V>> sortedValues = new ArrayList<>(); + for (WindowedValue<V> value : unsortedValues) { + sortedValues.add(value); + } + Collections.sort( + sortedValues, + new Comparator<WindowedValue<V>>() { + @Override + public int compare(WindowedValue<V> e1, WindowedValue<V> e2) { + return e1.getTimestamp().compareTo(e2.getTimestamp()); + } + }); + c.output(KV.<K, Iterable<WindowedValue<V>>>of(key, sortedValues)); + } + })) + .setCoder(input.getCoder()); + } + } + + /** + * Helper transform that takes a collection of timestamp-ordered + * values associated with each key, groups the values by window, + * combines windows as needed, and for each window in each key, + * outputs a collection of key/value-list pairs implicitly assigned + * to the window and with the timestamp derived from that window. + */ + private static class GroupAlsoByWindow<K, V> + extends PTransform< + PCollection<KV<K, Iterable<WindowedValue<V>>>>, PCollection<KV<K, Iterable<V>>>> { + private final WindowingStrategy<?, ?> windowingStrategy; + + public GroupAlsoByWindow(WindowingStrategy<?, ?> windowingStrategy) { + this.windowingStrategy = windowingStrategy; + } + + @Override + @SuppressWarnings("unchecked") + public PCollection<KV<K, Iterable<V>>> apply( + PCollection<KV<K, Iterable<WindowedValue<V>>>> input) { + @SuppressWarnings("unchecked") + KvCoder<K, Iterable<WindowedValue<V>>> inputKvCoder = + (KvCoder<K, Iterable<WindowedValue<V>>>) input.getCoder(); + + Coder<K> keyCoder = inputKvCoder.getKeyCoder(); + Coder<Iterable<WindowedValue<V>>> inputValueCoder = inputKvCoder.getValueCoder(); + + IterableCoder<WindowedValue<V>> inputIterableValueCoder = + (IterableCoder<WindowedValue<V>>) inputValueCoder; + Coder<WindowedValue<V>> inputIterableElementCoder = inputIterableValueCoder.getElemCoder(); + WindowedValueCoder<V> inputIterableWindowedValueCoder = + (WindowedValueCoder<V>) inputIterableElementCoder; + + Coder<V> inputIterableElementValueCoder = inputIterableWindowedValueCoder.getValueCoder(); + Coder<Iterable<V>> outputValueCoder = IterableCoder.of(inputIterableElementValueCoder); + Coder<KV<K, Iterable<V>>> outputKvCoder = KvCoder.of(keyCoder, outputValueCoder); + + return input + .apply(ParDo.of(groupAlsoByWindowsFn(windowingStrategy, inputIterableElementValueCoder))) + .setCoder(outputKvCoder); + } + + private <W extends BoundedWindow> + GroupAlsoByWindowsViaOutputBufferDoFn<K, V, Iterable<V>, W> groupAlsoByWindowsFn( + WindowingStrategy<?, W> strategy, Coder<V> inputIterableElementValueCoder) { + return new GroupAlsoByWindowsViaOutputBufferDoFn<K, V, Iterable<V>, W>( + strategy, SystemReduceFn.<K, V, W>buffering(inputIterableElementValueCoder)); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ca5b2def/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java index 560d8ec..483e8c0 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java @@ -18,7 +18,6 @@ package com.google.cloud.dataflow.sdk.util; import com.google.cloud.dataflow.sdk.options.PipelineOptions; import com.google.cloud.dataflow.sdk.transforms.Aggregator; import com.google.cloud.dataflow.sdk.transforms.DoFn; -import com.google.cloud.dataflow.sdk.transforms.GroupByKey.GroupByKeyOnly; import com.google.cloud.dataflow.sdk.transforms.windowing.AfterWatermark; import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; import com.google.cloud.dataflow.sdk.transforms.windowing.OutputTimeFn; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ca5b2def/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactoryTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactoryTest.java index 4ced82f..9933ec1 100644 --- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactoryTest.java +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactoryTest.java @@ -26,7 +26,7 @@ import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.C import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle; import com.google.cloud.dataflow.sdk.testing.TestPipeline; import com.google.cloud.dataflow.sdk.transforms.Create; -import com.google.cloud.dataflow.sdk.transforms.GroupByKey; +import com.google.cloud.dataflow.sdk.util.GroupByKeyViaGroupByKeyOnly.ReifyTimestampsAndWindows; import com.google.cloud.dataflow.sdk.util.KeyedWorkItem; import com.google.cloud.dataflow.sdk.util.KeyedWorkItems; import com.google.cloud.dataflow.sdk.util.WindowedValue; @@ -60,7 +60,7 @@ public class GroupByKeyEvaluatorFactoryTest { PCollection<KV<String, Integer>> values = p.apply(Create.of(firstFoo, firstBar, secondFoo, firstBaz, secondBar, thirdFoo)); PCollection<KV<String, WindowedValue<Integer>>> kvs = - values.apply(new GroupByKey.ReifyTimestampsAndWindows<String, Integer>()); + values.apply(new ReifyTimestampsAndWindows<String, Integer>()); PCollection<KeyedWorkItem<String, Integer>> groupedKvs = kvs.apply(new GroupByKeyEvaluatorFactory.InProcessGroupByKeyOnly<String, Integer>());