This is an automated email from the ASF dual-hosted git repository.

iemejia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 2d9bfd5  [BEAM-6966] Spark portable runner: get PAssert working
     new e413098  Merge pull request #8323: [BEAM-6966] Spark portable runner: 
get PAssert working
2d9bfd5 is described below

commit 2d9bfd54737221c6472632cc9d4a49ef1a149f25
Author: Kyle Weaver <kcwea...@google.com>
AuthorDate: Tue Apr 2 14:45:40 2019 -0700

    [BEAM-6966] Spark portable runner: get PAssert working
---
 .../SparkBatchPortablePipelineTranslator.java       |  9 +++++++++
 .../runners/spark/SparkPortableExecutionTest.java   | 21 +++++++++++++++------
 2 files changed, 24 insertions(+), 6 deletions(-)

diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkBatchPortablePipelineTranslator.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkBatchPortablePipelineTranslator.java
index 2ad0669..12f4ea9 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkBatchPortablePipelineTranslator.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkBatchPortablePipelineTranslator.java
@@ -215,6 +215,15 @@ public class SparkBatchPortablePipelineTranslator {
           staged.flatMap(new 
SparkExecutableStageExtractionFunction<>(outputMap.get(outputId)));
       context.pushDataset(outputId, new BoundedDataset<>(outputRdd));
     }
+    if (outputs.isEmpty()) {
+      // After pipeline translation, we traverse the set of unconsumed 
PCollections and add a
+      // no-op sink to each to make sure they are materialized by Spark. 
However, some SDK-executed
+      // stages have no runner-visible output after fusion. We handle this 
case by adding a sink
+      // here.
+      JavaRDD<WindowedValue<OutputT>> outputRdd =
+          staged.flatMap((rawUnionValue) -> Collections.emptyIterator());
+      context.pushDataset("EmptyOutputSink", new BoundedDataset<>(outputRdd));
+    }
   }
 
   /**
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkPortableExecutionTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkPortableExecutionTest.java
index 38bdd1f..d7d3428 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkPortableExecutionTest.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkPortableExecutionTest.java
@@ -18,11 +18,13 @@
 package org.apache.beam.runners.spark;
 
 import java.io.Serializable;
+import java.util.Collections;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.runners.core.construction.Environments;
+import org.apache.beam.runners.core.construction.JavaReadViaImpulse;
 import org.apache.beam.runners.core.construction.PipelineTranslation;
 import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation;
 import org.apache.beam.sdk.Pipeline;
@@ -30,6 +32,7 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.options.PortablePipelineOptions;
 import org.apache.beam.sdk.testing.CrashingRunner;
+import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.Impulse;
@@ -39,6 +42,7 @@ import org.apache.beam.sdk.transforms.WithKeys;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
+import 
org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
 import 
org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.ListeningExecutorService;
 import 
org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.MoreExecutors;
 import org.junit.AfterClass;
@@ -83,15 +87,15 @@ public class SparkPortableExecutionTest implements 
Serializable {
 
     Pipeline p = Pipeline.create(options);
 
-    final PCollectionView<Integer> view =
+    final PCollectionView<Long> view =
         p.apply("impulse23", Impulse.create())
             .apply(
                 "create23",
                 ParDo.of(
-                    new DoFn<byte[], Integer>() {
+                    new DoFn<byte[], Long>() {
                       @ProcessElement
                       public void process(ProcessContext context) {
-                        context.output(23);
+                        context.output(23L);
                       }
                     }))
             .apply(View.asSingleton());
@@ -127,8 +131,7 @@ public class SparkPortableExecutionTest implements 
Serializable {
                         new DoFn<KV<String, Iterable<Long>>, KV<String, 
Long>>() {
                           @ProcessElement
                           public void process(ProcessContext context) {
-                            LOG.info("Side input: {}", 
context.sideInput(view));
-                            LOG.info("Output element: {}", context.element());
+                            context.output(KV.of("bar", 
context.sideInput(view)));
                             for (Long i : context.element().getValue()) {
                               context.output(KV.of(context.element().getKey(), 
i));
                             }
@@ -138,7 +141,13 @@ public class SparkPortableExecutionTest implements 
Serializable {
             // Second GBK forces the output to be materialized
             .apply("gbk", GroupByKey.create());
 
-    // TODO Get PAssert working to test pipeline result
+    PAssert.that(result)
+        .containsInAnyOrder(
+            KV.of("foo", ImmutableList.of(4L, 3L, 3L)), KV.of("bar", 
ImmutableList.of(23L)));
+
+    // This is line below required to convert the PAssert's read to an 
impulse, which is expected
+    // by the GreedyPipelineFuser.
+    
p.replaceAll(Collections.singletonList(JavaReadViaImpulse.boundedOverride()));
 
     RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p);
 

Reply via email to