This is an automated email from the ASF dual-hosted git repository. iemejia pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git
commit ad22b6828cf7b78945e10cd60362c27a0447e66a Author: Ismaël Mejía <ieme...@gmail.com> AuthorDate: Fri May 10 11:44:53 2019 +0200 Pass transform based doFnSchemaInformation in ParDo translation --- .../structuredstreaming/translation/batch/ParDoTranslatorBatch.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java index b16d7e9..400b025 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java @@ -77,6 +77,9 @@ class ParDoTranslatorBatch<InputT, OutputT> signature.stateDeclarations().size() > 0 || signature.timerDeclarations().size() > 0; checkState(!stateful, "States and timers are not supported for the moment."); + DoFnSchemaInformation doFnSchemaInformation = + ParDoTranslation.getSchemaInformation(context.getCurrentTransform()); + // Init main variables Dataset<WindowedValue<InputT>> inputDataSet = context.getDataset(context.getInput()); Map<TupleTag<?>, PValue> outputs = context.getOutputs(); @@ -110,7 +113,7 @@ class ParDoTranslatorBatch<InputT, OutputT> inputCoder, outputCoderMap, broadcastStateData, - DoFnSchemaInformation.create()); + doFnSchemaInformation); Dataset<Tuple2<TupleTag<?>, WindowedValue<?>>> allOutputs = inputDataSet.mapPartitions(doFnWrapper, EncoderHelpers.tuple2Encoder());