This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git
commit 039f58a6a07e567bb8c5636caecebc61dec9129e Author: Etienne Chauchot <echauc...@apache.org> AuthorDate: Mon Sep 30 12:13:25 2019 +0200 Apply new Encoders to GroupByKey --- .../batch/GroupByKeyTranslatorBatch.java | 25 ++++++++++++++++------ 1 file changed, 19 insertions(+), 6 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java index 3e203a8..2970aa7 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java @@ -29,6 +29,8 @@ import org.apache.beam.runners.spark.structuredstreaming.translation.Translation import org.apache.beam.runners.spark.structuredstreaming.translation.batch.functions.GroupAlsoByWindowViaOutputBufferFn; import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers; import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.KVHelpers; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.util.WindowedValue; @@ -54,11 +56,21 @@ class GroupByKeyTranslatorBatch<K, V> Dataset<WindowedValue<KV<K, V>>> input = context.getDataset(inputPCollection); + WindowingStrategy<?, ?> windowingStrategy = inputPCollection.getWindowingStrategy(); + KvCoder<K, V> kvCoder = (KvCoder<K, V>) inputPCollection.getCoder(); + // group by key only + Coder<K> keyCoder = kvCoder.getKeyCoder(); KeyValueGroupedDataset<K, WindowedValue<KV<K, V>>> groupByKeyOnly = - input.groupByKey(KVHelpers.extractKey(), EncoderHelpers.genericEncoder()); + input.groupByKey(KVHelpers.extractKey(), EncoderHelpers.fromBeamCoder( + keyCoder)); // Materialize groupByKeyOnly values, potential OOM because of creation of new iterable + Coder<V> valueCoder = kvCoder.getValueCoder(); + WindowedValue.WindowedValueCoder<V> wvCoder = + WindowedValue.FullWindowedValueCoder.of( + valueCoder, inputPCollection.getWindowingStrategy().getWindowFn().windowCoder()); + IterableCoder<WindowedValue<V>> iterableCoder = IterableCoder.of(wvCoder); Dataset<KV<K, Iterable<WindowedValue<V>>>> materialized = groupByKeyOnly.mapGroups( (MapGroupsFunction<K, WindowedValue<KV<K, V>>, KV<K, Iterable<WindowedValue<V>>>>) @@ -77,19 +89,20 @@ class GroupByKeyTranslatorBatch<K, V> KV.of(key, Iterables.unmodifiableIterable(values)); return kv; }, - EncoderHelpers.kvEncoder()); + EncoderHelpers.fromBeamCoder(KvCoder.of(keyCoder, iterableCoder))); - WindowingStrategy<?, ?> windowingStrategy = inputPCollection.getWindowingStrategy(); - KvCoder<K, V> coder = (KvCoder<K, V>) inputPCollection.getCoder(); // group also by windows + WindowedValue.FullWindowedValueCoder<KV<K, Iterable<V>>> outputCoder = WindowedValue.FullWindowedValueCoder + .of(KvCoder.of(keyCoder, IterableCoder.of(valueCoder)), + windowingStrategy.getWindowFn().windowCoder()); Dataset<WindowedValue<KV<K, Iterable<V>>>> output = materialized.flatMap( new GroupAlsoByWindowViaOutputBufferFn<>( windowingStrategy, new InMemoryStateInternalsFactory<>(), - SystemReduceFn.buffering(coder.getValueCoder()), + SystemReduceFn.buffering(valueCoder), context.getSerializableOptions()), - EncoderHelpers.windowedValueEncoder()); + EncoderHelpers.fromBeamCoder(outputCoder)); context.putDataset(context.getOutput(), output); }