This is an automated email from the ASF dual-hosted git repository.

iemejia pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to 
refs/heads/spark-runner_structured-streaming by this push:
     new 797b40d  Add PipelineResults to Spark structured streaming.
797b40d is described below

commit 797b40d6d91677d1ded3edd288c542b6cb85473f
Author: Ryan Skraba <r...@skraba.com>
AuthorDate: Fri Jul 12 18:52:48 2019 +0200

    Add PipelineResults to Spark structured streaming.
---
 runners/spark/build.gradle                         |   1 +
 .../beam/runners/spark/SparkRunnerRegistrar.java   |   4 +-
 .../SparkStructuredStreamingPipelineOptions.java   |   6 +
 .../SparkStructuredStreamingPipelineResult.java    | 120 +++++++++--
 .../SparkStructuredStreamingRunner.java            |  27 ++-
 .../translation/TranslationContext.java            |   5 +-
 .../runners/spark/SparkRunnerRegistrarTest.java    |   3 +-
 .../StructuredStreamingPipelineStateTest.java      | 222 +++++++++++++++++++++
 8 files changed, 364 insertions(+), 24 deletions(-)

diff --git a/runners/spark/build.gradle b/runners/spark/build.gradle
index ca2e4a3..4c42e62 100644
--- a/runners/spark/build.gradle
+++ b/runners/spark/build.gradle
@@ -170,6 +170,7 @@ task validatesStructuredStreamingRunnerBatch(type: Test) {
   group = "Verification"
   def pipelineOptions = JsonOutput.toJson([
           "--runner=SparkStructuredStreamingRunner",
+          "--testMode=true",
           "--streaming=false",
   ])
   systemProperty "beamTestPipelineOptions", pipelineOptions
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java
index 82cdf0a..1d1fbff 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java
@@ -18,6 +18,7 @@
 package org.apache.beam.runners.spark;
 
 import com.google.auto.service.AutoService;
+import 
org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingPipelineOptions;
 import 
org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingRunner;
 import org.apache.beam.sdk.PipelineRunner;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -50,7 +51,8 @@ public final class SparkRunnerRegistrar {
   public static class Options implements PipelineOptionsRegistrar {
     @Override
     public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
-      return ImmutableList.of(SparkPipelineOptions.class);
+      return ImmutableList.of(
+          SparkPipelineOptions.class, 
SparkStructuredStreamingPipelineOptions.class);
     }
   }
 }
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingPipelineOptions.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingPipelineOptions.java
index bbf89f6..5573f78 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingPipelineOptions.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingPipelineOptions.java
@@ -32,4 +32,10 @@ public interface SparkStructuredStreamingPipelineOptions 
extends SparkCommonPipe
   Boolean getEnableSparkMetricSinks();
 
   void setEnableSparkMetricSinks(Boolean enableSparkMetricSinks);
+
+  /** Set to true to run the job in test mode. */
+  @Default.Boolean(false)
+  boolean getTestMode();
+
+  void setTestMode(boolean testMode);
 }
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingPipelineResult.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingPipelineResult.java
index ccec469..dc353d7 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingPipelineResult.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingPipelineResult.java
@@ -20,41 +20,131 @@ package org.apache.beam.runners.spark.structuredstreaming;
 import static 
org.apache.beam.runners.core.metrics.MetricsContainerStepMap.asAttemptedOnlyMetricResults;
 
 import java.io.IOException;
-import javax.annotation.Nullable;
+import java.util.Objects;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import 
org.apache.beam.runners.spark.structuredstreaming.metrics.MetricsAccumulator;
+import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.metrics.MetricResults;
+import org.apache.beam.sdk.util.UserCodeException;
+import org.apache.spark.SparkException;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.streaming.StreamingQuery;
 import org.joda.time.Duration;
 
 /** Represents a Spark pipeline execution result. */
-class SparkStructuredStreamingPipelineResult implements PipelineResult {
+public class SparkStructuredStreamingPipelineResult implements PipelineResult {
 
-  @Nullable // TODO: remove once method will be implemented
-  @Override
-  public State getState() {
-    return null;
+  final Future pipelineExecution;
+  final SparkSession sparkSession;
+  PipelineResult.State state;
+
+  // TODO: Implement results on a streaming pipeline. Currently does not 
stream.
+  final boolean isStreaming = false;
+
+  SparkStructuredStreamingPipelineResult(
+      final Future<?> pipelineExecution, final SparkSession sparkSession) {
+    this.pipelineExecution = pipelineExecution;
+    this.sparkSession = sparkSession;
+    // pipelineExecution is expected to have started executing eagerly.
+    this.state = State.RUNNING;
+  }
+
+  private static RuntimeException runtimeExceptionFrom(final Throwable e) {
+    return (e instanceof RuntimeException) ? (RuntimeException) e : new 
RuntimeException(e);
+  }
+
+  private static RuntimeException beamExceptionFrom(final Throwable e) {
+    // Scala doesn't declare checked exceptions in the bytecode, and the Java 
compiler
+    // won't let you catch something that is not declared, so we can't catch
+    // SparkException directly, instead we do an instanceof check.
+
+    if (e instanceof SparkException) {
+      if (e.getCause() != null && e.getCause() instanceof UserCodeException) {
+        UserCodeException userException = (UserCodeException) e.getCause();
+        return new 
Pipeline.PipelineExecutionException(userException.getCause());
+      } else if (e.getCause() != null) {
+        return new Pipeline.PipelineExecutionException(e.getCause());
+      }
+    }
+
+    return runtimeExceptionFrom(e);
+  }
+
+  protected void stop() {
+    try {
+      // TODO: await any outstanding queries on the session if this is 
streaming.
+      if (isStreaming) {
+        for (StreamingQuery query : sparkSession.streams().active()) {
+          query.stop();
+        }
+      }
+    } catch (Exception e) {
+      throw beamExceptionFrom(e);
+    } finally {
+      sparkSession.stop();
+      if (Objects.equals(state, State.RUNNING)) {
+        this.state = State.STOPPED;
+      }
+    }
+  }
+
+  private State awaitTermination(Duration duration)
+      throws TimeoutException, ExecutionException, InterruptedException {
+    pipelineExecution.get(duration.getMillis(), TimeUnit.MILLISECONDS);
+    // Throws an exception if the job is not finished successfully in the 
given time.
+    // TODO: all streaming functionality
+    return PipelineResult.State.DONE;
   }
 
-  @Nullable // TODO: remove once method will be implemented
   @Override
-  public State cancel() throws IOException {
-    return null;
+  public PipelineResult.State getState() {
+    return state;
   }
 
-  @Nullable // TODO: remove once method will be implemented
   @Override
-  public State waitUntilFinish(Duration duration) {
-    return null;
+  public PipelineResult.State waitUntilFinish() {
+    return waitUntilFinish(Duration.millis(Long.MAX_VALUE));
   }
 
-  @Nullable // TODO: remove once method will be implemented
   @Override
-  public State waitUntilFinish() {
-    return null;
+  public State waitUntilFinish(final Duration duration) {
+    try {
+      State finishState = awaitTermination(duration);
+      offerNewState(finishState);
+
+    } catch (final TimeoutException e) {
+      // ignore.
+    } catch (final ExecutionException e) {
+      offerNewState(PipelineResult.State.FAILED);
+      throw beamExceptionFrom(e.getCause());
+    } catch (final Exception e) {
+      offerNewState(PipelineResult.State.FAILED);
+      throw beamExceptionFrom(e);
+    }
+
+    return state;
   }
 
   @Override
   public MetricResults metrics() {
     return 
asAttemptedOnlyMetricResults(MetricsAccumulator.getInstance().value());
   }
+
+  @Override
+  public PipelineResult.State cancel() throws IOException {
+    offerNewState(PipelineResult.State.CANCELLED);
+    return state;
+  }
+
+  private void offerNewState(State newState) {
+    State oldState = this.state;
+    this.state = newState;
+    if (!oldState.isTerminal() && newState.isTerminal()) {
+      stop();
+    }
+  }
 }
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingRunner.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingRunner.java
index 29e4373..362c0e1 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingRunner.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingRunner.java
@@ -19,6 +19,9 @@ package org.apache.beam.runners.spark.structuredstreaming;
 
 import static 
org.apache.beam.runners.core.construction.PipelineResources.detectClassPathResourcesToStage;
 
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import org.apache.beam.runners.core.metrics.MetricsPusher;
 import 
org.apache.beam.runners.spark.structuredstreaming.aggregators.AggregatorsAccumulator;
 import 
org.apache.beam.runners.spark.structuredstreaming.metrics.AggregatorMetricSource;
@@ -131,12 +134,21 @@ public final class SparkStructuredStreamingRunner
     AggregatorsAccumulator.clear();
     MetricsAccumulator.clear();
 
-    TranslationContext translationContext = translatePipeline(pipeline);
-    // TODO initialise other services: checkpointing, metrics system, 
listeners, ...
-    // TODO pass testMode using pipelineOptions
-    translationContext.startPipeline(true);
+    final TranslationContext translationContext = translatePipeline(pipeline);
 
-    SparkStructuredStreamingPipelineResult result = new 
SparkStructuredStreamingPipelineResult();
+    final ExecutorService executorService = 
Executors.newSingleThreadExecutor();
+    final Future<?> submissionFuture =
+        executorService.submit(
+            () -> {
+              // TODO initialise other services: checkpointing, metrics 
system, listeners, ...
+              translationContext.startPipeline();
+            });
+    executorService.shutdown();
+
+    // TODO: Streaming.
+    SparkStructuredStreamingPipelineResult result =
+        new SparkStructuredStreamingPipelineResult(
+            submissionFuture, translationContext.getSparkSession());
 
     if (options.getEnableSparkMetricSinks()) {
       registerMetricsSource(options.getAppName());
@@ -147,6 +159,11 @@ public final class SparkStructuredStreamingRunner
             MetricsAccumulator.getInstance().value(), 
options.as(MetricsOptions.class), result);
     metricsPusher.start();
 
+    if (options.getTestMode()) {
+      result.waitUntilFinish();
+      result.stop();
+    }
+
     return result;
   }
 
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
index 00bdea8..4157ea6 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
@@ -180,7 +180,7 @@ public class TranslationContext {
   // 
--------------------------------------------------------------------------------------------
 
   /** Starts the pipeline. */
-  public void startPipeline(boolean testMode) {
+  public void startPipeline() {
     try {
       SparkStructuredStreamingPipelineOptions options =
           
serializablePipelineOptions.get().as(SparkStructuredStreamingPipelineOptions.class);
@@ -194,9 +194,10 @@ public class TranslationContext {
             dataStreamWriter =
                 dataStreamWriter.option("checkpointLocation", 
options.getCheckpointDir());
           }
+          // TODO: Do not await termination here.
           dataStreamWriter.foreach(new 
NoOpForeachWriter<>()).start().awaitTermination();
         } else {
-          if (testMode) {
+          if (options.getTestMode()) {
             // cannot use dataset.show because dataset schema is binary so it 
will print binary
             // code.
             List<WindowedValue> windowedValues = ((Dataset<WindowedValue>) 
dataset).collectAsList();
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java
index 0cff3ec..e804423 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java
@@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
 import java.util.ServiceLoader;
+import 
org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingPipelineOptions;
 import 
org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingRunner;
 import org.apache.beam.sdk.options.PipelineOptionsRegistrar;
 import org.apache.beam.sdk.runners.PipelineRunnerRegistrar;
@@ -36,7 +37,7 @@ public class SparkRunnerRegistrarTest {
   @Test
   public void testOptions() {
     assertEquals(
-        ImmutableList.of(SparkPipelineOptions.class),
+        ImmutableList.of(SparkPipelineOptions.class, 
SparkStructuredStreamingPipelineOptions.class),
         new SparkRunnerRegistrar.Options().getPipelineOptions());
   }
 
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/StructuredStreamingPipelineStateTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/StructuredStreamingPipelineStateTest.java
new file mode 100644
index 0000000..4718b9d
--- /dev/null
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/StructuredStreamingPipelineStateTest.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.structuredstreaming;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.fail;
+
+import java.io.Serializable;
+import org.apache.beam.runners.spark.io.CreateStream;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.joda.time.Duration;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+/** This suite tests that various scenarios result in proper states of the 
pipeline. */
+public class StructuredStreamingPipelineStateTest implements Serializable {
+
+  private static class MyCustomException extends RuntimeException {
+
+    MyCustomException(final String message) {
+      super(message);
+    }
+  }
+
+  private final transient SparkStructuredStreamingPipelineOptions options =
+      PipelineOptionsFactory.as(SparkStructuredStreamingPipelineOptions.class);
+
+  @Rule public transient TestName testName = new TestName();
+
+  private static final String FAILED_THE_BATCH_INTENTIONALLY = "Failed the 
batch intentionally";
+
+  private ParDo.SingleOutput<String, String> printParDo(final String prefix) {
+    return ParDo.of(
+        new DoFn<String, String>() {
+
+          @ProcessElement
+          public void processElement(final ProcessContext c) {
+            System.out.println(prefix + " " + c.element());
+          }
+        });
+  }
+
+  private PTransform<PBegin, PCollection<String>> getValues(
+      final SparkStructuredStreamingPipelineOptions options) {
+    final boolean doNotSyncWithWatermark = false;
+    return options.isStreaming()
+        ? CreateStream.of(StringUtf8Coder.of(), Duration.millis(1), 
doNotSyncWithWatermark)
+            .nextBatch("one", "two")
+        : Create.of("one", "two");
+  }
+
+  private SparkStructuredStreamingPipelineOptions getStreamingOptions() {
+    options.setRunner(SparkStructuredStreamingRunner.class);
+    options.setStreaming(true);
+    return options;
+  }
+
+  private SparkStructuredStreamingPipelineOptions getBatchOptions() {
+    options.setRunner(SparkStructuredStreamingRunner.class);
+    options.setStreaming(false); // explicit because options is reused 
throughout the test.
+    return options;
+  }
+
+  private Pipeline getPipeline(final SparkStructuredStreamingPipelineOptions 
options) {
+
+    final Pipeline pipeline = Pipeline.create(options);
+    final String name = testName.getMethodName() + "(isStreaming=" + 
options.isStreaming() + ")";
+
+    
pipeline.apply(getValues(options)).setCoder(StringUtf8Coder.of()).apply(printParDo(name));
+
+    return pipeline;
+  }
+
+  private void testFailedPipeline(final 
SparkStructuredStreamingPipelineOptions options)
+      throws Exception {
+
+    SparkStructuredStreamingPipelineResult result = null;
+
+    try {
+      final Pipeline pipeline = Pipeline.create(options);
+      pipeline
+          .apply(getValues(options))
+          .setCoder(StringUtf8Coder.of())
+          .apply(
+              MapElements.via(
+                  new SimpleFunction<String, String>() {
+
+                    @Override
+                    public String apply(final String input) {
+                      throw new 
MyCustomException(FAILED_THE_BATCH_INTENTIONALLY);
+                    }
+                  }));
+
+      result = (SparkStructuredStreamingPipelineResult) pipeline.run();
+      result.waitUntilFinish();
+    } catch (final Exception e) {
+      assertThat(e, instanceOf(Pipeline.PipelineExecutionException.class));
+      assertThat(e.getCause(), instanceOf(MyCustomException.class));
+      assertThat(e.getCause().getMessage(), 
is(FAILED_THE_BATCH_INTENTIONALLY));
+      assertThat(result.getState(), is(PipelineResult.State.FAILED));
+      result.cancel();
+      return;
+    }
+
+    fail("An injected failure did not affect the pipeline as expected.");
+  }
+
+  private void testTimeoutPipeline(final 
SparkStructuredStreamingPipelineOptions options)
+      throws Exception {
+
+    final Pipeline pipeline = getPipeline(options);
+
+    final SparkStructuredStreamingPipelineResult result =
+        (SparkStructuredStreamingPipelineResult) pipeline.run();
+
+    result.waitUntilFinish(Duration.millis(1));
+
+    assertThat(result.getState(), is(PipelineResult.State.RUNNING));
+
+    result.cancel();
+  }
+
+  private void testCanceledPipeline(final 
SparkStructuredStreamingPipelineOptions options)
+      throws Exception {
+
+    final Pipeline pipeline = getPipeline(options);
+
+    final SparkStructuredStreamingPipelineResult result =
+        (SparkStructuredStreamingPipelineResult) pipeline.run();
+
+    result.cancel();
+
+    assertThat(result.getState(), is(PipelineResult.State.CANCELLED));
+  }
+
+  private void testRunningPipeline(final 
SparkStructuredStreamingPipelineOptions options)
+      throws Exception {
+
+    final Pipeline pipeline = getPipeline(options);
+
+    final SparkStructuredStreamingPipelineResult result =
+        (SparkStructuredStreamingPipelineResult) pipeline.run();
+
+    assertThat(result.getState(), is(PipelineResult.State.RUNNING));
+
+    result.cancel();
+  }
+
+  @Ignore("TODO: Reactivate with streaming.")
+  @Test
+  public void testStreamingPipelineRunningState() throws Exception {
+    testRunningPipeline(getStreamingOptions());
+  }
+
+  @Test
+  public void testBatchPipelineRunningState() throws Exception {
+    testRunningPipeline(getBatchOptions());
+  }
+
+  @Ignore("TODO: Reactivate with streaming.")
+  @Test
+  public void testStreamingPipelineCanceledState() throws Exception {
+    testCanceledPipeline(getStreamingOptions());
+  }
+
+  @Test
+  public void testBatchPipelineCanceledState() throws Exception {
+    testCanceledPipeline(getBatchOptions());
+  }
+
+  @Ignore("TODO: Reactivate with streaming.")
+  @Test
+  public void testStreamingPipelineFailedState() throws Exception {
+    testFailedPipeline(getStreamingOptions());
+  }
+
+  @Test
+  public void testBatchPipelineFailedState() throws Exception {
+    testFailedPipeline(getBatchOptions());
+  }
+
+  @Ignore("TODO: Reactivate with streaming.")
+  @Test
+  public void testStreamingPipelineTimeoutState() throws Exception {
+    testTimeoutPipeline(getStreamingOptions());
+  }
+
+  @Test
+  public void testBatchPipelineTimeoutState() throws Exception {
+    testTimeoutPipeline(getBatchOptions());
+  }
+}

Reply via email to