This is an automated email from the ASF dual-hosted git repository.
echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to
refs/heads/spark-runner_structured-streaming by this push:
new e771da0 Use more generic Row instead of GenericRowWithSchema
e771da0 is described below
commit e771da03b634d4ca0f67c75330799c7b3de466d0
Author: Etienne Chauchot <[email protected]>
AuthorDate: Thu Feb 28 09:35:30 2019 +0100
Use more generic Row instead of GenericRowWithSchema
---
.../translation/batch/AggregatorCombinerGlobally.java | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)
diff --git
a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombinerGlobally.java
b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombinerGlobally.java
index be8ece2..58f48e2 100644
---
a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombinerGlobally.java
+++
b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombinerGlobally.java
@@ -22,7 +22,7 @@ import
org.apache.beam.runners.spark.structuredstreaming.translation.helpers.Enc
import
org.apache.beam.runners.spark.structuredstreaming.translation.helpers.RowHelpers;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.spark.sql.Encoder;
-import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema;
+import org.apache.spark.sql.Row;
import org.apache.spark.sql.expressions.Aggregator;
public class AggregatorCombinerGlobally<InputT, AccumT, OutputT>
@@ -42,8 +42,8 @@ public class AggregatorCombinerGlobally<InputT, AccumT,
OutputT>
@Override
public AccumT reduce(AccumT accumulator, InputT input) {
// we receive a GenericRowWithSchema from spark containing an InputT
- GenericRowWithSchema genericRow = (GenericRowWithSchema) input;
- InputT t = RowHelpers.<InputT>extractObjectFromRow(genericRow);
+ Row row = (Row) input;
+ InputT t = RowHelpers.extractObjectFromRow(row);
return combineFn.addInput(accumulator, t);
}