This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to 
refs/heads/spark-runner_structured-streaming by this push:
     new e771da0  Use more generic Row instead of GenericRowWithSchema
e771da0 is described below

commit e771da03b634d4ca0f67c75330799c7b3de466d0
Author: Etienne Chauchot <[email protected]>
AuthorDate: Thu Feb 28 09:35:30 2019 +0100

    Use more generic Row instead of GenericRowWithSchema
---
 .../translation/batch/AggregatorCombinerGlobally.java               | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git 
a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombinerGlobally.java
 
b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombinerGlobally.java
index be8ece2..58f48e2 100644
--- 
a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombinerGlobally.java
+++ 
b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombinerGlobally.java
@@ -22,7 +22,7 @@ import 
org.apache.beam.runners.spark.structuredstreaming.translation.helpers.Enc
 import 
org.apache.beam.runners.spark.structuredstreaming.translation.helpers.RowHelpers;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.spark.sql.Encoder;
-import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema;
+import org.apache.spark.sql.Row;
 import org.apache.spark.sql.expressions.Aggregator;
 
 public class AggregatorCombinerGlobally<InputT, AccumT, OutputT>
@@ -42,8 +42,8 @@ public class AggregatorCombinerGlobally<InputT, AccumT, 
OutputT>
   @Override
   public AccumT reduce(AccumT accumulator, InputT input) {
     // we receive a GenericRowWithSchema from spark containing an InputT
-    GenericRowWithSchema genericRow = (GenericRowWithSchema) input;
-    InputT t = RowHelpers.<InputT>extractObjectFromRow(genericRow);
+    Row row = (Row) input;
+    InputT t = RowHelpers.extractObjectFromRow(row);
     return combineFn.addInput(accumulator, t);
   }
 

Reply via email to