This is an automated email from the ASF dual-hosted git repository. iemejia pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/spark-runner_structured-streaming by this push: new 797b40d Add PipelineResults to Spark structured streaming. 797b40d is described below commit 797b40d6d91677d1ded3edd288c542b6cb85473f Author: Ryan Skraba <r...@skraba.com> AuthorDate: Fri Jul 12 18:52:48 2019 +0200 Add PipelineResults to Spark structured streaming. --- runners/spark/build.gradle | 1 + .../beam/runners/spark/SparkRunnerRegistrar.java | 4 +- .../SparkStructuredStreamingPipelineOptions.java | 6 + .../SparkStructuredStreamingPipelineResult.java | 120 +++++++++-- .../SparkStructuredStreamingRunner.java | 27 ++- .../translation/TranslationContext.java | 5 +- .../runners/spark/SparkRunnerRegistrarTest.java | 3 +- .../StructuredStreamingPipelineStateTest.java | 222 +++++++++++++++++++++ 8 files changed, 364 insertions(+), 24 deletions(-) diff --git a/runners/spark/build.gradle b/runners/spark/build.gradle index ca2e4a3..4c42e62 100644 --- a/runners/spark/build.gradle +++ b/runners/spark/build.gradle @@ -170,6 +170,7 @@ task validatesStructuredStreamingRunnerBatch(type: Test) { group = "Verification" def pipelineOptions = JsonOutput.toJson([ "--runner=SparkStructuredStreamingRunner", + "--testMode=true", "--streaming=false", ]) systemProperty "beamTestPipelineOptions", pipelineOptions diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java index 82cdf0a..1d1fbff 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java @@ -18,6 +18,7 @@ package org.apache.beam.runners.spark; import com.google.auto.service.AutoService; +import org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingPipelineOptions; import org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingRunner; import org.apache.beam.sdk.PipelineRunner; import org.apache.beam.sdk.options.PipelineOptions; @@ -50,7 +51,8 @@ public final class SparkRunnerRegistrar { public static class Options implements PipelineOptionsRegistrar { @Override public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() { - return ImmutableList.of(SparkPipelineOptions.class); + return ImmutableList.of( + SparkPipelineOptions.class, SparkStructuredStreamingPipelineOptions.class); } } } diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingPipelineOptions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingPipelineOptions.java index bbf89f6..5573f78 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingPipelineOptions.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingPipelineOptions.java @@ -32,4 +32,10 @@ public interface SparkStructuredStreamingPipelineOptions extends SparkCommonPipe Boolean getEnableSparkMetricSinks(); void setEnableSparkMetricSinks(Boolean enableSparkMetricSinks); + + /** Set to true to run the job in test mode. */ + @Default.Boolean(false) + boolean getTestMode(); + + void setTestMode(boolean testMode); } diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingPipelineResult.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingPipelineResult.java index ccec469..dc353d7 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingPipelineResult.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingPipelineResult.java @@ -20,41 +20,131 @@ package org.apache.beam.runners.spark.structuredstreaming; import static org.apache.beam.runners.core.metrics.MetricsContainerStepMap.asAttemptedOnlyMetricResults; import java.io.IOException; -import javax.annotation.Nullable; +import java.util.Objects; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.apache.beam.runners.spark.structuredstreaming.metrics.MetricsAccumulator; +import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.metrics.MetricResults; +import org.apache.beam.sdk.util.UserCodeException; +import org.apache.spark.SparkException; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.streaming.StreamingQuery; import org.joda.time.Duration; /** Represents a Spark pipeline execution result. */ -class SparkStructuredStreamingPipelineResult implements PipelineResult { +public class SparkStructuredStreamingPipelineResult implements PipelineResult { - @Nullable // TODO: remove once method will be implemented - @Override - public State getState() { - return null; + final Future pipelineExecution; + final SparkSession sparkSession; + PipelineResult.State state; + + // TODO: Implement results on a streaming pipeline. Currently does not stream. + final boolean isStreaming = false; + + SparkStructuredStreamingPipelineResult( + final Future<?> pipelineExecution, final SparkSession sparkSession) { + this.pipelineExecution = pipelineExecution; + this.sparkSession = sparkSession; + // pipelineExecution is expected to have started executing eagerly. + this.state = State.RUNNING; + } + + private static RuntimeException runtimeExceptionFrom(final Throwable e) { + return (e instanceof RuntimeException) ? (RuntimeException) e : new RuntimeException(e); + } + + private static RuntimeException beamExceptionFrom(final Throwable e) { + // Scala doesn't declare checked exceptions in the bytecode, and the Java compiler + // won't let you catch something that is not declared, so we can't catch + // SparkException directly, instead we do an instanceof check. + + if (e instanceof SparkException) { + if (e.getCause() != null && e.getCause() instanceof UserCodeException) { + UserCodeException userException = (UserCodeException) e.getCause(); + return new Pipeline.PipelineExecutionException(userException.getCause()); + } else if (e.getCause() != null) { + return new Pipeline.PipelineExecutionException(e.getCause()); + } + } + + return runtimeExceptionFrom(e); + } + + protected void stop() { + try { + // TODO: await any outstanding queries on the session if this is streaming. + if (isStreaming) { + for (StreamingQuery query : sparkSession.streams().active()) { + query.stop(); + } + } + } catch (Exception e) { + throw beamExceptionFrom(e); + } finally { + sparkSession.stop(); + if (Objects.equals(state, State.RUNNING)) { + this.state = State.STOPPED; + } + } + } + + private State awaitTermination(Duration duration) + throws TimeoutException, ExecutionException, InterruptedException { + pipelineExecution.get(duration.getMillis(), TimeUnit.MILLISECONDS); + // Throws an exception if the job is not finished successfully in the given time. + // TODO: all streaming functionality + return PipelineResult.State.DONE; } - @Nullable // TODO: remove once method will be implemented @Override - public State cancel() throws IOException { - return null; + public PipelineResult.State getState() { + return state; } - @Nullable // TODO: remove once method will be implemented @Override - public State waitUntilFinish(Duration duration) { - return null; + public PipelineResult.State waitUntilFinish() { + return waitUntilFinish(Duration.millis(Long.MAX_VALUE)); } - @Nullable // TODO: remove once method will be implemented @Override - public State waitUntilFinish() { - return null; + public State waitUntilFinish(final Duration duration) { + try { + State finishState = awaitTermination(duration); + offerNewState(finishState); + + } catch (final TimeoutException e) { + // ignore. + } catch (final ExecutionException e) { + offerNewState(PipelineResult.State.FAILED); + throw beamExceptionFrom(e.getCause()); + } catch (final Exception e) { + offerNewState(PipelineResult.State.FAILED); + throw beamExceptionFrom(e); + } + + return state; } @Override public MetricResults metrics() { return asAttemptedOnlyMetricResults(MetricsAccumulator.getInstance().value()); } + + @Override + public PipelineResult.State cancel() throws IOException { + offerNewState(PipelineResult.State.CANCELLED); + return state; + } + + private void offerNewState(State newState) { + State oldState = this.state; + this.state = newState; + if (!oldState.isTerminal() && newState.isTerminal()) { + stop(); + } + } } diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingRunner.java index 29e4373..362c0e1 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingRunner.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingRunner.java @@ -19,6 +19,9 @@ package org.apache.beam.runners.spark.structuredstreaming; import static org.apache.beam.runners.core.construction.PipelineResources.detectClassPathResourcesToStage; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import org.apache.beam.runners.core.metrics.MetricsPusher; import org.apache.beam.runners.spark.structuredstreaming.aggregators.AggregatorsAccumulator; import org.apache.beam.runners.spark.structuredstreaming.metrics.AggregatorMetricSource; @@ -131,12 +134,21 @@ public final class SparkStructuredStreamingRunner AggregatorsAccumulator.clear(); MetricsAccumulator.clear(); - TranslationContext translationContext = translatePipeline(pipeline); - // TODO initialise other services: checkpointing, metrics system, listeners, ... - // TODO pass testMode using pipelineOptions - translationContext.startPipeline(true); + final TranslationContext translationContext = translatePipeline(pipeline); - SparkStructuredStreamingPipelineResult result = new SparkStructuredStreamingPipelineResult(); + final ExecutorService executorService = Executors.newSingleThreadExecutor(); + final Future<?> submissionFuture = + executorService.submit( + () -> { + // TODO initialise other services: checkpointing, metrics system, listeners, ... + translationContext.startPipeline(); + }); + executorService.shutdown(); + + // TODO: Streaming. + SparkStructuredStreamingPipelineResult result = + new SparkStructuredStreamingPipelineResult( + submissionFuture, translationContext.getSparkSession()); if (options.getEnableSparkMetricSinks()) { registerMetricsSource(options.getAppName()); @@ -147,6 +159,11 @@ public final class SparkStructuredStreamingRunner MetricsAccumulator.getInstance().value(), options.as(MetricsOptions.class), result); metricsPusher.start(); + if (options.getTestMode()) { + result.waitUntilFinish(); + result.stop(); + } + return result; } diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java index 00bdea8..4157ea6 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java @@ -180,7 +180,7 @@ public class TranslationContext { // -------------------------------------------------------------------------------------------- /** Starts the pipeline. */ - public void startPipeline(boolean testMode) { + public void startPipeline() { try { SparkStructuredStreamingPipelineOptions options = serializablePipelineOptions.get().as(SparkStructuredStreamingPipelineOptions.class); @@ -194,9 +194,10 @@ public class TranslationContext { dataStreamWriter = dataStreamWriter.option("checkpointLocation", options.getCheckpointDir()); } + // TODO: Do not await termination here. dataStreamWriter.foreach(new NoOpForeachWriter<>()).start().awaitTermination(); } else { - if (testMode) { + if (options.getTestMode()) { // cannot use dataset.show because dataset schema is binary so it will print binary // code. List<WindowedValue> windowedValues = ((Dataset<WindowedValue>) dataset).collectAsList(); diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java index 0cff3ec..e804423 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java @@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; import java.util.ServiceLoader; +import org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingPipelineOptions; import org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingRunner; import org.apache.beam.sdk.options.PipelineOptionsRegistrar; import org.apache.beam.sdk.runners.PipelineRunnerRegistrar; @@ -36,7 +37,7 @@ public class SparkRunnerRegistrarTest { @Test public void testOptions() { assertEquals( - ImmutableList.of(SparkPipelineOptions.class), + ImmutableList.of(SparkPipelineOptions.class, SparkStructuredStreamingPipelineOptions.class), new SparkRunnerRegistrar.Options().getPipelineOptions()); } diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/StructuredStreamingPipelineStateTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/StructuredStreamingPipelineStateTest.java new file mode 100644 index 0000000..4718b9d --- /dev/null +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/StructuredStreamingPipelineStateTest.java @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.spark.structuredstreaming; + +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.fail; + +import java.io.Serializable; +import org.apache.beam.runners.spark.io.CreateStream; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.joda.time.Duration; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; + +/** This suite tests that various scenarios result in proper states of the pipeline. */ +public class StructuredStreamingPipelineStateTest implements Serializable { + + private static class MyCustomException extends RuntimeException { + + MyCustomException(final String message) { + super(message); + } + } + + private final transient SparkStructuredStreamingPipelineOptions options = + PipelineOptionsFactory.as(SparkStructuredStreamingPipelineOptions.class); + + @Rule public transient TestName testName = new TestName(); + + private static final String FAILED_THE_BATCH_INTENTIONALLY = "Failed the batch intentionally"; + + private ParDo.SingleOutput<String, String> printParDo(final String prefix) { + return ParDo.of( + new DoFn<String, String>() { + + @ProcessElement + public void processElement(final ProcessContext c) { + System.out.println(prefix + " " + c.element()); + } + }); + } + + private PTransform<PBegin, PCollection<String>> getValues( + final SparkStructuredStreamingPipelineOptions options) { + final boolean doNotSyncWithWatermark = false; + return options.isStreaming() + ? CreateStream.of(StringUtf8Coder.of(), Duration.millis(1), doNotSyncWithWatermark) + .nextBatch("one", "two") + : Create.of("one", "two"); + } + + private SparkStructuredStreamingPipelineOptions getStreamingOptions() { + options.setRunner(SparkStructuredStreamingRunner.class); + options.setStreaming(true); + return options; + } + + private SparkStructuredStreamingPipelineOptions getBatchOptions() { + options.setRunner(SparkStructuredStreamingRunner.class); + options.setStreaming(false); // explicit because options is reused throughout the test. + return options; + } + + private Pipeline getPipeline(final SparkStructuredStreamingPipelineOptions options) { + + final Pipeline pipeline = Pipeline.create(options); + final String name = testName.getMethodName() + "(isStreaming=" + options.isStreaming() + ")"; + + pipeline.apply(getValues(options)).setCoder(StringUtf8Coder.of()).apply(printParDo(name)); + + return pipeline; + } + + private void testFailedPipeline(final SparkStructuredStreamingPipelineOptions options) + throws Exception { + + SparkStructuredStreamingPipelineResult result = null; + + try { + final Pipeline pipeline = Pipeline.create(options); + pipeline + .apply(getValues(options)) + .setCoder(StringUtf8Coder.of()) + .apply( + MapElements.via( + new SimpleFunction<String, String>() { + + @Override + public String apply(final String input) { + throw new MyCustomException(FAILED_THE_BATCH_INTENTIONALLY); + } + })); + + result = (SparkStructuredStreamingPipelineResult) pipeline.run(); + result.waitUntilFinish(); + } catch (final Exception e) { + assertThat(e, instanceOf(Pipeline.PipelineExecutionException.class)); + assertThat(e.getCause(), instanceOf(MyCustomException.class)); + assertThat(e.getCause().getMessage(), is(FAILED_THE_BATCH_INTENTIONALLY)); + assertThat(result.getState(), is(PipelineResult.State.FAILED)); + result.cancel(); + return; + } + + fail("An injected failure did not affect the pipeline as expected."); + } + + private void testTimeoutPipeline(final SparkStructuredStreamingPipelineOptions options) + throws Exception { + + final Pipeline pipeline = getPipeline(options); + + final SparkStructuredStreamingPipelineResult result = + (SparkStructuredStreamingPipelineResult) pipeline.run(); + + result.waitUntilFinish(Duration.millis(1)); + + assertThat(result.getState(), is(PipelineResult.State.RUNNING)); + + result.cancel(); + } + + private void testCanceledPipeline(final SparkStructuredStreamingPipelineOptions options) + throws Exception { + + final Pipeline pipeline = getPipeline(options); + + final SparkStructuredStreamingPipelineResult result = + (SparkStructuredStreamingPipelineResult) pipeline.run(); + + result.cancel(); + + assertThat(result.getState(), is(PipelineResult.State.CANCELLED)); + } + + private void testRunningPipeline(final SparkStructuredStreamingPipelineOptions options) + throws Exception { + + final Pipeline pipeline = getPipeline(options); + + final SparkStructuredStreamingPipelineResult result = + (SparkStructuredStreamingPipelineResult) pipeline.run(); + + assertThat(result.getState(), is(PipelineResult.State.RUNNING)); + + result.cancel(); + } + + @Ignore("TODO: Reactivate with streaming.") + @Test + public void testStreamingPipelineRunningState() throws Exception { + testRunningPipeline(getStreamingOptions()); + } + + @Test + public void testBatchPipelineRunningState() throws Exception { + testRunningPipeline(getBatchOptions()); + } + + @Ignore("TODO: Reactivate with streaming.") + @Test + public void testStreamingPipelineCanceledState() throws Exception { + testCanceledPipeline(getStreamingOptions()); + } + + @Test + public void testBatchPipelineCanceledState() throws Exception { + testCanceledPipeline(getBatchOptions()); + } + + @Ignore("TODO: Reactivate with streaming.") + @Test + public void testStreamingPipelineFailedState() throws Exception { + testFailedPipeline(getStreamingOptions()); + } + + @Test + public void testBatchPipelineFailedState() throws Exception { + testFailedPipeline(getBatchOptions()); + } + + @Ignore("TODO: Reactivate with streaming.") + @Test + public void testStreamingPipelineTimeoutState() throws Exception { + testTimeoutPipeline(getStreamingOptions()); + } + + @Test + public void testBatchPipelineTimeoutState() throws Exception { + testTimeoutPipeline(getBatchOptions()); + } +}