Repository: incubator-beam Updated Branches: refs/heads/apex-runner 968eb32b8 -> 51af7e592
BEAM-858 Enable ApexRunner integration test in examples. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/77f4ba2e Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/77f4ba2e Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/77f4ba2e Branch: refs/heads/apex-runner Commit: 77f4ba2eebf0c02dc95a8b90b4277a12638c4300 Parents: 968eb32 Author: Thomas Weise <t...@apache.org> Authored: Fri Oct 28 23:47:42 2016 -0700 Committer: Thomas Weise <t...@apache.org> Committed: Tue Nov 1 03:35:15 2016 +0100 ---------------------------------------------------------------------- examples/java/pom.xml | 31 ++++++++++++++++++++ runners/apex/pom.xml | 2 +- .../beam/runners/apex/ApexPipelineOptions.java | 6 ---- .../runners/apex/ApexPipelineTranslator.java | 2 +- .../apache/beam/runners/apex/ApexRunner.java | 12 -------- .../beam/runners/apex/ApexRunnerResult.java | 27 +++++++++++++++-- .../beam/runners/apex/TestApexRunner.java | 20 +++++++++++-- .../io/ApexReadUnboundedInputOperator.java | 13 ++++++-- .../apex/examples/StreamingWordCountTest.java | 4 +-- .../translators/ParDoBoundTranslatorTest.java | 6 ++-- 10 files changed, 91 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/77f4ba2e/examples/java/pom.xml ---------------------------------------------------------------------- diff --git a/examples/java/pom.xml b/examples/java/pom.xml index fc82ed4..6c1a16f 100644 --- a/examples/java/pom.xml +++ b/examples/java/pom.xml @@ -85,6 +85,14 @@ <dependency> <groupId>org.apache.beam</groupId> + <artifactId>beam-runners-apex</artifactId> + <version>${project.version}</version> + <scope>runtime</scope> + <optional>true</optional> + </dependency> + + <dependency> + <groupId>org.apache.beam</groupId> <artifactId>beam-runners-spark</artifactId> <version>${project.version}</version> <scope>runtime</scope> @@ -224,6 +232,29 @@ </systemPropertyVariables> </configuration> </execution> + <execution> + <id>apex-runner-integration-tests</id> + <goals> + <goal>integration-test</goal> + <goal>verify</goal> + </goals> + <configuration> + <includes> + <include>WordCountIT.java</include> + </includes> + <parallel>all</parallel> + <threadCount>4</threadCount> + <systemPropertyVariables> + <beamTestPipelineOptions> + [ + "--project=apache-beam-testing", + "--tempRoot=gs://temp-storage-for-end-to-end-tests", + "--runner=org.apache.beam.runners.apex.TestApexRunner" + ] + </beamTestPipelineOptions> + </systemPropertyVariables> + </configuration> + </execution> </executions> </plugin> </plugins> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/77f4ba2e/runners/apex/pom.xml ---------------------------------------------------------------------- diff --git a/runners/apex/pom.xml b/runners/apex/pom.xml index 6ccc0da..d4bcc3d 100644 --- a/runners/apex/pom.xml +++ b/runners/apex/pom.xml @@ -72,7 +72,7 @@ <groupId>org.apache.apex</groupId> <artifactId>apex-engine</artifactId> <version>${apex.core.version}</version> - <scope>test</scope> + <scope>runtime</scope> </dependency> <!--- Beam --> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/77f4ba2e/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineOptions.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineOptions.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineOptions.java index 141a8c1..54fdf76 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineOptions.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineOptions.java @@ -32,12 +32,6 @@ public interface ApexPipelineOptions extends PipelineOptions, java.io.Serializab String getApplicationName(); - @Description("set parallelism for Apex runner") - void setParallelism(int parallelism); - - @Default.Integer(1) - int getParallelism(); - @Description("execute the pipeline with embedded cluster") void setEmbeddedExecution(boolean embedded); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/77f4ba2e/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java index a6857ee..8a87ce0 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java @@ -150,7 +150,7 @@ public class ApexPipelineTranslator implements Pipeline.PipelineVisitor { BoundedToUnboundedSourceAdapter unboundedSource = new BoundedToUnboundedSourceAdapter<>( transform.getSource()); ApexReadUnboundedInputOperator<T, ?> operator = new ApexReadUnboundedInputOperator<>( - unboundedSource, context.getPipelineOptions()); + unboundedSource, true, context.getPipelineOptions()); context.addOperator(operator, operator.output); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/77f4ba2e/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java index 8da4ec3..416e99c 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java @@ -137,18 +137,6 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> { } assertionError = null; lc.runAsync(); - if (options.getRunMillis() > 0) { - try { - long timeout = System.currentTimeMillis() + options.getRunMillis(); - while (System.currentTimeMillis() < timeout) { - if (assertionError != null) { - throw assertionError; - } - } - } finally { - lc.shutdown(); - } - } return new ApexRunnerResult(lma.getDAG(), lc); } catch (Exception e) { Throwables.propagateIfPossible(e); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/77f4ba2e/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java index 4e3a8d2..03428a6 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java @@ -21,6 +21,7 @@ import com.datatorrent.api.DAG; import com.datatorrent.api.LocalMode; import java.io.IOException; +import java.lang.reflect.Field; import org.apache.beam.sdk.AggregatorRetrievalException; import org.apache.beam.sdk.AggregatorValues; @@ -63,12 +64,12 @@ public class ApexRunnerResult implements PipelineResult { @Override public State waitUntilFinish(Duration duration) { - throw new UnsupportedOperationException(); + return ApexRunnerResult.waitUntilFinished(ctrl, duration); } @Override public State waitUntilFinish() { - throw new UnsupportedOperationException(); + return ApexRunnerResult.waitUntilFinished(ctrl, null); } @Override @@ -84,4 +85,26 @@ public class ApexRunnerResult implements PipelineResult { return apexDAG; } + public static State waitUntilFinished(LocalMode.Controller ctrl, Duration duration) { + // we need to rely on internal field for now + // Apex should make it available through API in upcoming release. + long timeout = (duration == null || duration.getMillis() < 1) ? Long.MAX_VALUE + : System.currentTimeMillis() + duration.getMillis(); + Field appDoneField; + try { + appDoneField = ctrl.getClass().getDeclaredField("appDone"); + appDoneField.setAccessible(true); + while (!appDoneField.getBoolean(ctrl) && System.currentTimeMillis() < timeout) { + if (ApexRunner.assertionError != null) { + throw ApexRunner.assertionError; + } + Thread.sleep(500); + } + return appDoneField.getBoolean(ctrl) ? State.DONE : null; + } catch (NoSuchFieldException | SecurityException | IllegalArgumentException + | IllegalAccessException | InterruptedException e) { + throw new RuntimeException(e); + } + } + } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/77f4ba2e/runners/apex/src/main/java/org/apache/beam/runners/apex/TestApexRunner.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/TestApexRunner.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/TestApexRunner.java index 2e048f0..e447e37 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/TestApexRunner.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/TestApexRunner.java @@ -17,6 +17,8 @@ */ package org.apache.beam.runners.apex; +import java.io.IOException; + import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsValidator; @@ -24,18 +26,19 @@ import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.PInput; import org.apache.beam.sdk.values.POutput; +import org.joda.time.Duration; /** * Apex {@link PipelineRunner} for testing. */ public class TestApexRunner extends PipelineRunner<ApexRunnerResult> { - private ApexRunner delegate; + private static final int RUN_WAIT_MILLIS = 20000; + private final ApexRunner delegate; private TestApexRunner(ApexPipelineOptions options) { options.setEmbeddedExecution(true); //options.setEmbeddedExecutionDebugMode(false); - options.setRunMillis(20000); this.delegate = ApexRunner.fromOptions(options); } @@ -53,7 +56,18 @@ public class TestApexRunner extends PipelineRunner<ApexRunnerResult> { @Override public ApexRunnerResult run(Pipeline pipeline) { - return delegate.run(pipeline); + ApexRunnerResult result = delegate.run(pipeline); + try { + // this is necessary for tests that just call run() and not waitUntilFinish + result.waitUntilFinish(Duration.millis(RUN_WAIT_MILLIS)); + return result; + } finally { + try { + result.cancel(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/77f4ba2e/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ApexReadUnboundedInputOperator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ApexReadUnboundedInputOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ApexReadUnboundedInputOperator.java index 3188dfa..0e2b0c2 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ApexReadUnboundedInputOperator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ApexReadUnboundedInputOperator.java @@ -55,6 +55,7 @@ public class ApexReadUnboundedInputOperator<OutputT, CheckpointMarkT private final SerializablePipelineOptions pipelineOptions; @Bind(JavaSerializer.class) private final UnboundedSource<OutputT, CheckpointMarkT> source; + private final boolean isBoundedSource; private transient UnboundedSource.UnboundedReader<OutputT> reader; private transient boolean available = false; @OutputPortFieldAnnotation(optional = true) @@ -65,16 +66,24 @@ public class ApexReadUnboundedInputOperator<OutputT, CheckpointMarkT ApexPipelineOptions options) { this.pipelineOptions = new SerializablePipelineOptions(options); this.source = source; + this.isBoundedSource = false; + } + + public ApexReadUnboundedInputOperator(UnboundedSource<OutputT, CheckpointMarkT> source, + boolean isBoundedSource, ApexPipelineOptions options) { + this.pipelineOptions = new SerializablePipelineOptions(options); + this.source = source; + this.isBoundedSource = isBoundedSource; } @SuppressWarnings("unused") // for Kryo private ApexReadUnboundedInputOperator() { - this.pipelineOptions = null; this.source = null; + this.pipelineOptions = null; this.source = null; this.isBoundedSource = false; } @Override public void beginWindow(long windowId) { - if (!available && source instanceof ValuesSource) { + if (!available && (isBoundedSource || source instanceof ValuesSource)) { // if it's a Create and the input was consumed, emit final watermark emitWatermarkIfNecessary(GlobalWindow.TIMESTAMP_MAX_VALUE.getMillis()); // terminate the stream (allows tests to finish faster) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/77f4ba2e/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/StreamingWordCountTest.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/StreamingWordCountTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/StreamingWordCountTest.java index 6ab2e8e..363e669 100644 --- a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/StreamingWordCountTest.java +++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/StreamingWordCountTest.java @@ -91,7 +91,6 @@ public class StreamingWordCountTest { ApexPipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() .as(ApexPipelineOptions.class); options.setApplicationName("StreamingWordCount"); - options.setParallelism(1); Pipeline p = Pipeline.create(options); PCollection<KV<String, Long>> wordCounts = @@ -110,7 +109,7 @@ public class StreamingWordCountTest { && FormatAsStringFn.RESULTS.containsKey("bar")) { break; } - Thread.sleep(1000); + result.waitUntilFinish(Duration.millis(1000)); } result.cancel(); Assert.assertTrue( @@ -118,4 +117,5 @@ public class StreamingWordCountTest { FormatAsStringFn.RESULTS.clear(); } + } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/77f4ba2e/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java index 6f50398..2379a9e 100644 --- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java +++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java @@ -192,7 +192,7 @@ public class ParDoBoundTranslatorTest { Pipeline pipeline = Pipeline.create(options); PCollection<Integer> pcollection = pipeline.apply(Create.of(1, 2, 3, 4)); PAssert.that(pcollection).containsInAnyOrder(2, 1, 4, 3); - // TODO: good candidate to terminate fast based on processed assertion vs. for auto-shutdown + // TODO: terminate faster based on processed assertion vs. auto-shutdown pipeline.run(); } @@ -263,8 +263,8 @@ public class ParDoBoundTranslatorTest { Pipeline pipeline = Pipeline.create(options); List<Integer> inputs = Arrays.asList(3, -42, 666); - final TupleTag<String> mainOutputTag = new TupleTag<String>("main"); - final TupleTag<Void> sideOutputTag = new TupleTag<Void>("sideOutput"); + final TupleTag<String> mainOutputTag = new TupleTag<>("main"); + final TupleTag<Void> sideOutputTag = new TupleTag<>("sideOutput"); PCollectionView<Integer> sideInput1 = pipeline .apply("CreateSideInput1", Create.of(11))