This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit f19721f35b9249c1db712dc4c72a588105019726
Author: Etienne Chauchot <echauc...@apache.org>
AuthorDate: Tue Jan 15 16:00:55 2019 +0100

    Fix testMode output to comply with new binary schema
---
 .../spark/structuredstreaming/translation/TranslationContext.java  | 7 ++++++-
 1 file changed, 6 insertions(+), 1 deletion(-)

diff --git 
a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
 
b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
index 33706bd..0f20663 100644
--- 
a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
+++ 
b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
@@ -21,6 +21,7 @@ import com.google.common.collect.Iterables;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
@@ -180,7 +181,11 @@ public class TranslationContext {
           dataset.writeStream().foreach(new 
NoOpForeachWriter<>()).start().awaitTermination();
         } else {
           if (testMode){
-            dataset.show();
+            // cannot use dataset.show because dataset schema is binary so it 
will print binary code.
+            List<WindowedValue> windowedValues = 
((Dataset<WindowedValue>)dataset).collectAsList();
+            for (WindowedValue windowedValue : windowedValues){
+              System.out.println(windowedValue);
+            }
           } else {
             // apply a dummy fn just to apply forech action that will trigger 
the pipeline run in spark
             dataset.foreachPartition(t -> {

Reply via email to