This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git
commit f19721f35b9249c1db712dc4c72a588105019726 Author: Etienne Chauchot <echauc...@apache.org> AuthorDate: Tue Jan 15 16:00:55 2019 +0100 Fix testMode output to comply with new binary schema --- .../spark/structuredstreaming/translation/TranslationContext.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java index 33706bd..0f20663 100644 --- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java @@ -21,6 +21,7 @@ import com.google.common.collect.Iterables; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; @@ -180,7 +181,11 @@ public class TranslationContext { dataset.writeStream().foreach(new NoOpForeachWriter<>()).start().awaitTermination(); } else { if (testMode){ - dataset.show(); + // cannot use dataset.show because dataset schema is binary so it will print binary code. + List<WindowedValue> windowedValues = ((Dataset<WindowedValue>)dataset).collectAsList(); + for (WindowedValue windowedValue : windowedValues){ + System.out.println(windowedValue); + } } else { // apply a dummy fn just to apply forech action that will trigger the pipeline run in spark dataset.foreachPartition(t -> {