This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to 
refs/heads/spark-runner_structured-streaming by this push:
     new 92e6953  Fix encoder bug in combinePerkey
92e6953 is described below

commit 92e6953f00246d5779a5a93451298e9c4a584b4d
Author: Etienne Chauchot <[email protected]>
AuthorDate: Thu Feb 28 10:17:13 2019 +0100

    Fix encoder bug in combinePerkey
---
 .../translation/batch/CombinePerKeyTranslatorBatch.java                 | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java
 
b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java
index d38c614..177de87 100644
--- 
a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java
+++ 
b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java
@@ -54,7 +54,7 @@ class CombinePerKeyTranslatorBatch<K, InputT, AccumT, OutputT>
 
     Dataset<KV<K, InputT>> keyedDataset =
         inputDataset.map(
-            WindowingHelpers.unwindowMapFunction(), 
EncoderHelpers.windowedValueEncoder());
+            WindowingHelpers.unwindowMapFunction(), 
EncoderHelpers.kvEncoder());
 
     KeyValueGroupedDataset<K, KV<K, InputT>> groupedDataset =
         keyedDataset.groupByKey(KVHelpers.extractKey(), 
EncoderHelpers.genericEncoder());

Reply via email to