[BEAM-853] Force streaming execution on batch pipelines for testing. Expose the adapted source.
Force streaming execution, if set in PipelineOptions. Added test. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/5f41deda Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/5f41deda Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/5f41deda Branch: refs/heads/gearpump-runner Commit: 5f41deda509acbbbc6280323e583bb3c1af2dad2 Parents: 1ad638e Author: Sela <ans...@paypal.com> Authored: Wed Dec 14 12:20:08 2016 +0200 Committer: Kenneth Knowles <k...@google.com> Committed: Fri Dec 16 22:03:44 2016 -0800 ---------------------------------------------------------------------- .../runners/spark/SparkPipelineOptions.java | 5 + .../beam/runners/spark/TestSparkRunner.java | 80 +++++++++++- .../beam/runners/spark/ForceStreamingTest.java | 123 +++++++++++++++++++ .../sdk/io/BoundedReadFromUnboundedSource.java | 14 ++- 4 files changed, 217 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5f41deda/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java index a2cd887..04c559e 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java @@ -100,4 +100,9 @@ public interface SparkPipelineOptions @Default.Boolean(false) boolean getUsesProvidedSparkContext(); void setUsesProvidedSparkContext(boolean value); + + @Description("A special flag that forces streaming in tests.") + @Default.Boolean(false) + boolean isForceStreaming(); + void setForceStreaming(boolean forceStreaming); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5f41deda/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java index 2c26d84..798ca47 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java @@ -19,16 +19,26 @@ package org.apache.beam.runners.spark; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import org.apache.beam.runners.core.UnboundedReadFromBoundedSource; import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.PipelineResult.State; +import org.apache.beam.sdk.io.BoundedReadFromUnboundedSource; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsValidator; import org.apache.beam.sdk.runners.PipelineRunner; +import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipelineOptions; import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.util.ValueWithRecordId; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PInput; import org.apache.beam.sdk.values.POutput; + /** * The SparkRunner translate operations defined on a pipeline to a representation executable * by Spark, and then submitting the job to Spark to be executed. If we wanted to run a Beam @@ -53,9 +63,12 @@ import org.apache.beam.sdk.values.POutput; public final class TestSparkRunner extends PipelineRunner<SparkPipelineResult> { private SparkRunner delegate; + private boolean isForceStreaming; + private int expectedNumberOfAssertions = 0; private TestSparkRunner(SparkPipelineOptions options) { this.delegate = SparkRunner.fromOptions(options); + this.isForceStreaming = options.isForceStreaming(); } public static TestSparkRunner fromOptions(PipelineOptions options) { @@ -65,19 +78,78 @@ public final class TestSparkRunner extends PipelineRunner<SparkPipelineResult> { return new TestSparkRunner(sparkOptions); } + /** + * Overrides for the test runner. + */ + @SuppressWarnings("unchecked") @Override - public <OutputT extends POutput, InputT extends PInput> - OutputT apply(PTransform<InputT, OutputT> transform, InputT input) { - return delegate.apply(transform, input); - }; + public <OutputT extends POutput, InputT extends PInput> OutputT apply( + PTransform<InputT, OutputT> transform, InputT input) { + // if the pipeline forces execution as a streaming pipeline, + // and the source is an adapted unbounded source (as bounded), + // read it as unbounded source via UnboundedReadFromBoundedSource. + if (isForceStreaming && transform instanceof BoundedReadFromUnboundedSource) { + return (OutputT) delegate.apply(new AdaptedBoundedAsUnbounded( + (BoundedReadFromUnboundedSource) transform), input); + } else { + // no actual override, simply counts asserting transforms in the pipeline. + if (transform instanceof PAssert.OneSideInputAssert + || transform instanceof PAssert.GroupThenAssert + || transform instanceof PAssert.GroupThenAssertForSingleton) { + expectedNumberOfAssertions += 1; + } + + return delegate.apply(transform, input); + } + } @Override public SparkPipelineResult run(Pipeline pipeline) { TestPipelineOptions testPipelineOptions = pipeline.getOptions().as(TestPipelineOptions.class); SparkPipelineResult result = delegate.run(pipeline); result.waitUntilFinish(); + + // make sure the test pipeline finished successfully. + State resultState = result.getState(); + assertThat( + String.format("Test pipeline result state was %s instead of %s", resultState, State.DONE), + resultState, + is(State.DONE)); assertThat(result, testPipelineOptions.getOnCreateMatcher()); assertThat(result, testPipelineOptions.getOnSuccessMatcher()); + + // if the pipeline was executed in streaming mode, validate aggregators. + if (isForceStreaming) { + // validate assertion succeeded (at least once). + int success = result.getAggregatorValue(PAssert.SUCCESS_COUNTER, Integer.class); + assertThat( + String.format( + "Expected %d successful assertions, but found %d.", + expectedNumberOfAssertions, success), + success, + is(expectedNumberOfAssertions)); + // validate assertion didn't fail. + int failure = result.getAggregatorValue(PAssert.FAILURE_COUNTER, Integer.class); + assertThat("Failure aggregator should be zero.", failure, is(0)); + } return result; } + + private static class AdaptedBoundedAsUnbounded<T> extends PTransform<PBegin, PCollection<T>> { + private final BoundedReadFromUnboundedSource<T> source; + + AdaptedBoundedAsUnbounded(BoundedReadFromUnboundedSource<T> source) { + this.source = source; + } + + @SuppressWarnings("unchecked") + @Override + public PCollection<T> expand(PBegin input) { + PTransform<PBegin, ? extends PCollection<ValueWithRecordId<T>>> replacingTransform = + new UnboundedReadFromBoundedSource<>(source.getAdaptedSource()); + return (PCollection<T>) input.apply(replacingTransform) + .apply("StripIds", ParDo.of(new ValueWithRecordId.StripIdsDoFn())); + } + } + } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5f41deda/runners/spark/src/test/java/org/apache/beam/runners/spark/ForceStreamingTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/ForceStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/ForceStreamingTest.java new file mode 100644 index 0000000..eb17eea --- /dev/null +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/ForceStreamingTest.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.spark; + +import static org.hamcrest.MatcherAssert.assertThat; + +import java.io.IOException; +import java.util.List; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.io.BoundedReadFromUnboundedSource; +import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.io.UnboundedSource; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.runners.TransformHierarchy; +import org.apache.beam.sdk.transforms.PTransform; +import org.junit.Test; + + +/** + * Test that we can "force streaming" on pipelines with {@link BoundedReadFromUnboundedSource} + * inputs using the {@link TestSparkRunner}. + * + * <p>The test validates that when a pipeline reads from a {@link BoundedReadFromUnboundedSource}, + * with {@link SparkPipelineOptions#setStreaming(boolean)} true + * and using the {@link TestSparkRunner}; the {@link Read.Bounded} transform + * is replaced by an {@link Read.Unbounded} transform. + * + * <p>This test does not execute a pipeline. + */ +public class ForceStreamingTest { + + @Test + public void test() throws IOException { + SparkPipelineOptions options = PipelineOptionsFactory.create().as(SparkPipelineOptions.class); + options.setRunner(TestSparkRunner.class); + // force streaming. + options.setForceStreaming(true); + + Pipeline pipeline = Pipeline.create(options); + + // apply the BoundedReadFromUnboundedSource. + @SuppressWarnings("unchecked") + BoundedReadFromUnboundedSource boundedRead = + Read.from(new FakeUnboundedSource()).withMaxNumRecords(-1); + //noinspection unchecked + pipeline.apply(boundedRead); + + UnboundedReadDetector unboundedReadDetector = new UnboundedReadDetector(); + pipeline.traverseTopologically(unboundedReadDetector); + + // assert that the applied BoundedReadFromUnboundedSource + // is being treated as an unbounded read. + assertThat("Expected to have an unbounded read.", unboundedReadDetector.isUnbounded); + } + + /** + * Traverses the Pipeline to check if the input is indeed a {@link Read.Unbounded}. + */ + private class UnboundedReadDetector extends Pipeline.PipelineVisitor.Defaults { + private boolean isUnbounded = false; + + @Override + public void visitPrimitiveTransform(TransformHierarchy.Node node) { + Class<? extends PTransform> transformClass = node.getTransform().getClass(); + if (transformClass == Read.Unbounded.class) { + isUnbounded = true; + } + } + + } + + /** + * A fake {@link UnboundedSource} to satisfy the compiler. + */ + private static class FakeUnboundedSource extends UnboundedSource { + + @Override + public List<? extends UnboundedSource> generateInitialSplits( + int desiredNumSplits, + PipelineOptions options) throws Exception { + return null; + } + + @Override + public UnboundedReader createReader( + PipelineOptions options, + CheckpointMark checkpointMark) throws IOException { + return null; + } + + @Override + public Coder getCheckpointMarkCoder() { + return null; + } + + @Override + public void validate() { } + + @Override + public Coder getDefaultOutputCoder() { + return null; + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5f41deda/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java index f2ef358..84e3044 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.NoSuchElementException; import java.util.concurrent.TimeUnit; import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Distinct; @@ -50,6 +51,7 @@ public class BoundedReadFromUnboundedSource<T> extends PTransform<PBegin, PColle private final UnboundedSource<T, ?> source; private final long maxNumRecords; private final Duration maxReadTime; + private final BoundedSource<ValueWithRecordId<T>> adaptedSource; private static final FluentBackoff BACKOFF_FACTORY = FluentBackoff.DEFAULT .withInitialBackoff(Duration.millis(10)) @@ -81,12 +83,22 @@ public class BoundedReadFromUnboundedSource<T> extends PTransform<PBegin, PColle this.source = source; this.maxNumRecords = maxNumRecords; this.maxReadTime = maxReadTime; + this.adaptedSource = new UnboundedToBoundedSourceAdapter<>(source, maxNumRecords, maxReadTime); + } + + /** + * Returns an adapted {@link BoundedSource} wrapping the underlying {@link UnboundedSource}, + * with the specified bounds on number of records and read time. + */ + @Experimental + public BoundedSource<ValueWithRecordId<T>> getAdaptedSource() { + return adaptedSource; } @Override public PCollection<T> expand(PBegin input) { PCollection<ValueWithRecordId<T>> read = Pipeline.applyTransform(input, - Read.from(new UnboundedToBoundedSourceAdapter<>(source, maxNumRecords, maxReadTime))); + Read.from(getAdaptedSource())); if (source.requiresDeduping()) { read = read.apply(Distinct.withRepresentativeValueFn( new SerializableFunction<ValueWithRecordId<T>, byte[]>() {