return encoded key for GroupByKey translation
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/364a3f08 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/364a3f08 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/364a3f08 Branch: refs/heads/gearpump-runner Commit: 364a3f089747ff4761cb5b54c963c8a8013574a0 Parents: f6aaf0d Author: manuzhang <owenzhang1...@gmail.com> Authored: Mon Jan 16 11:16:05 2017 +0800 Committer: manuzhang <owenzhang1...@gmail.com> Committed: Mon Jan 16 11:16:05 2017 +0800 ---------------------------------------------------------------------- .../translators/GroupByKeyTranslator.java | 24 ++++++++++++++++---- 1 file changed, 20 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/364a3f08/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java index e16a178..ac8e218 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java @@ -22,17 +22,22 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import java.io.Serializable; +import java.nio.ByteBuffer; import java.time.Instant; import java.util.Collection; import java.util.LinkedList; import java.util.List; import org.apache.beam.runners.gearpump.translators.utils.TranslatorUtils; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; +import org.apache.beam.sdk.util.CoderUtils; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; @@ -56,6 +61,7 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe @Override public void translate(GroupByKey<K, V> transform, TranslationContext context) { PCollection<KV<K, V>> input = context.getInput(transform); + Coder<K> inputKeyCoder = ((KvCoder<K, V>) input.getCoder()).getKeyCoder(); JavaStream<WindowedValue<KV<K, V>>> inputStream = context.getInputStream(input); int parallelism = context.getPipelineOptions().getParallelism(); @@ -64,7 +70,7 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe JavaStream<WindowedValue<KV<K, Iterable<V>>>> outputStream = inputStream .window(Window.apply(new GearpumpWindowFn(input.getWindowingStrategy().getWindowFn()), EventTimeTrigger$.MODULE$, Discarding$.MODULE$), "assign_window") - .groupBy(new GroupByFn<K, V>(), parallelism, "group_by_Key_and_Window") + .groupBy(new GroupByFn<K, V>(inputKeyCoder), parallelism, "group_by_Key_and_Window") .map(new ValueToIterable<K, V>(), "map_value_to_iterable") .map(new KeyedByTimestamp<K, V>(), "keyed_by_timestamp") .reduce(new Merge<K, V>(outputTimeFn), "merge") @@ -128,11 +134,21 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe } private static class GroupByFn<K, V> extends - GroupByFunction<WindowedValue<KV<K, V>>, K> { + GroupByFunction<WindowedValue<KV<K, V>>, ByteBuffer> { + + private final Coder<K> keyCoder; + + GroupByFn(Coder<K> keyCoder) { + this.keyCoder = keyCoder; + } @Override - public K apply(WindowedValue<KV<K, V>> wv) { - return wv.getValue().getKey(); + public ByteBuffer apply(WindowedValue<KV<K, V>> wv) { + try { + return ByteBuffer.wrap(CoderUtils.encodeToByteArray(keyCoder, wv.getValue().getKey())); + } catch (CoderException e) { + throw new RuntimeException(e); + } } }