Port DirectGroupByKey to SDK-agnostic APIs
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/02dbaefd Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/02dbaefd Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/02dbaefd Branch: refs/heads/gearpump-runner Commit: 02dbaefd2bbad0f0ff0b87469d184137b220fae7 Parents: 8c5b57e Author: Kenneth Knowles <k...@google.com> Authored: Fri May 26 14:27:23 2017 -0700 Committer: Kenneth Knowles <k...@google.com> Committed: Fri Jun 9 19:56:52 2017 -0700 ---------------------------------------------------------------------- .../apache/beam/runners/direct/DirectGroupByKey.java | 13 +++++++------ .../direct/DirectGroupByKeyOverrideFactory.java | 14 +++++++++++--- 2 files changed, 18 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/02dbaefd/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java index 2fc0dd4..06b8e29 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java @@ -36,13 +36,17 @@ import org.apache.beam.sdk.values.WindowingStrategy; class DirectGroupByKey<K, V> extends ForwardingPTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> { - private final GroupByKey<K, V> original; + private final PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> original; static final String DIRECT_GBKO_URN = "urn:beam:directrunner:transforms:gbko:v1"; static final String DIRECT_GABW_URN = "urn:beam:directrunner:transforms:gabw:v1"; + private final WindowingStrategy<?, ?> outputWindowingStrategy; - DirectGroupByKey(GroupByKey<K, V> from) { - this.original = from; + DirectGroupByKey( + PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> original, + WindowingStrategy<?, ?> outputWindowingStrategy) { + this.original = original; + this.outputWindowingStrategy = outputWindowingStrategy; } @Override @@ -57,9 +61,6 @@ class DirectGroupByKey<K, V> // key/value input elements and the window merge operation of the // window function associated with the input PCollection. WindowingStrategy<?, ?> inputWindowingStrategy = input.getWindowingStrategy(); - // Update the windowing strategy as appropriate. - WindowingStrategy<?, ?> outputWindowingStrategy = - original.updateWindowingStrategy(inputWindowingStrategy); // By default, implement GroupByKey via a series of lower-level operations. return input http://git-wip-us.apache.org/repos/asf/beam/blob/02dbaefd/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java index c2eb5e7..9c2de3d 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java @@ -17,26 +17,34 @@ */ package org.apache.beam.runners.direct; +import com.google.common.collect.Iterables; import org.apache.beam.runners.core.construction.PTransformReplacements; import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory; import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.runners.PTransformOverrideFactory; import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; /** A {@link PTransformOverrideFactory} for {@link GroupByKey} PTransforms. */ final class DirectGroupByKeyOverrideFactory<K, V> extends SingleInputOutputOverrideFactory< - PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>, GroupByKey<K, V>> { + PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>, + PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>>> { @Override public PTransformReplacement<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> getReplacementTransform( AppliedPTransform< - PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>, GroupByKey<K, V>> + PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>, + PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>>> transform) { + + PCollection<KV<K, Iterable<V>>> output = + (PCollection<KV<K, Iterable<V>>>) Iterables.getOnlyElement(transform.getOutputs().values()); + return PTransformReplacement.of( PTransformReplacements.getSingletonMainInput(transform), - new DirectGroupByKey<>(transform.getTransform())); + new DirectGroupByKey<>(transform.getTransform(), output.getWindowingStrategy())); } }