This is an automated email from the ASF dual-hosted git repository.

iemejia pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit ad22b6828cf7b78945e10cd60362c27a0447e66a
Author: Ismaël Mejía <ieme...@gmail.com>
AuthorDate: Fri May 10 11:44:53 2019 +0200

    Pass transform based doFnSchemaInformation in ParDo translation
---
 .../structuredstreaming/translation/batch/ParDoTranslatorBatch.java  | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)

diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java
index b16d7e9..400b025 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java
@@ -77,6 +77,9 @@ class ParDoTranslatorBatch<InputT, OutputT>
         signature.stateDeclarations().size() > 0 || 
signature.timerDeclarations().size() > 0;
     checkState(!stateful, "States and timers are not supported for the 
moment.");
 
+    DoFnSchemaInformation doFnSchemaInformation =
+        ParDoTranslation.getSchemaInformation(context.getCurrentTransform());
+
     // Init main variables
     Dataset<WindowedValue<InputT>> inputDataSet = 
context.getDataset(context.getInput());
     Map<TupleTag<?>, PValue> outputs = context.getOutputs();
@@ -110,7 +113,7 @@ class ParDoTranslatorBatch<InputT, OutputT>
             inputCoder,
             outputCoderMap,
             broadcastStateData,
-            DoFnSchemaInformation.create());
+            doFnSchemaInformation);
 
     Dataset<Tuple2<TupleTag<?>, WindowedValue<?>>> allOutputs =
         inputDataSet.mapPartitions(doFnWrapper, 
EncoderHelpers.tuple2Encoder());

Reply via email to