Repository: beam Updated Branches: refs/heads/release-2.1.0 4e281f699 -> e0acb18b0
Move DirectRunner knob for suppressing runner-determined sharding out of core SDK Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ec6ccae9 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ec6ccae9 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ec6ccae9 Branch: refs/heads/release-2.1.0 Commit: ec6ccae94ae96254791cd990e4f395eccbe9dcaf Parents: 4e281f6 Author: Kenneth Knowles <k...@google.com> Authored: Fri Jul 7 08:49:08 2017 -0700 Committer: Jean-Baptiste Onofré <jbono...@apache.org> Committed: Sun Jul 9 07:14:10 2017 +0200 ---------------------------------------------------------------------- runners/direct-java/pom.xml | 2 +- .../beam/runners/direct/DirectRegistrar.java | 2 +- .../beam/runners/direct/DirectRunner.java | 5 +-- .../beam/runners/direct/DirectTestOptions.java | 42 ++++++++++++++++++++ .../runners/direct/DirectRegistrarTest.java | 2 +- .../beam/sdk/testing/TestPipelineOptions.java | 10 ----- 6 files changed, 47 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/ec6ccae9/runners/direct-java/pom.xml ---------------------------------------------------------------------- diff --git a/runners/direct-java/pom.xml b/runners/direct-java/pom.xml index 5b5aec2..e56dcd2 100644 --- a/runners/direct-java/pom.xml +++ b/runners/direct-java/pom.xml @@ -156,7 +156,7 @@ <beamTestPipelineOptions> [ "--runner=DirectRunner", - "--unitTest" + "--runnerDeterminedSharding=false" ] </beamTestPipelineOptions> </systemPropertyVariables> http://git-wip-us.apache.org/repos/asf/beam/blob/ec6ccae9/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRegistrar.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRegistrar.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRegistrar.java index 0e6fbab..53fb2f2 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRegistrar.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRegistrar.java @@ -50,7 +50,7 @@ public class DirectRegistrar { @Override public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() { return ImmutableList.<Class<? extends PipelineOptions>>of( - DirectOptions.class); + DirectOptions.class, DirectTestOptions.class); } } } http://git-wip-us.apache.org/repos/asf/beam/blob/ec6ccae9/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java index a16e24d..7a221c4 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java @@ -43,7 +43,6 @@ import org.apache.beam.sdk.metrics.MetricResults; import org.apache.beam.sdk.metrics.MetricsEnvironment; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.runners.PTransformOverride; -import org.apache.beam.sdk.testing.TestPipelineOptions; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.ParDo.MultiOutput; @@ -222,9 +221,9 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult> { @SuppressWarnings("rawtypes") @VisibleForTesting List<PTransformOverride> defaultTransformOverrides() { - TestPipelineOptions testOptions = options.as(TestPipelineOptions.class); + DirectTestOptions testOptions = options.as(DirectTestOptions.class); ImmutableList.Builder<PTransformOverride> builder = ImmutableList.builder(); - if (!testOptions.isUnitTest()) { + if (testOptions.isRunnerDeterminedSharding()) { builder.add( PTransformOverride.of( PTransformMatchers.writeWithRunnerDeterminedSharding(), http://git-wip-us.apache.org/repos/asf/beam/blob/ec6ccae9/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTestOptions.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTestOptions.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTestOptions.java new file mode 100644 index 0000000..a426443 --- /dev/null +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTestOptions.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.sdk.options.ApplicationNameOptions; +import org.apache.beam.sdk.options.Default; +import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.Hidden; +import org.apache.beam.sdk.options.PipelineOptions; + +/** + * Internal-only options for tweaking the behavior of the {@link DirectRunner} in ways that users + * should never do. + * + * <p>Currently, the only use is to disable user-friendly overrides that prevent fully testing + * certain composite transforms. + */ +@Internal +@Hidden +public interface DirectTestOptions extends PipelineOptions, ApplicationNameOptions { + @Default.Boolean(true) + @Description( + "Indicates whether this is an automatically-run unit test.") + boolean isRunnerDeterminedSharding(); + void setRunnerDeterminedSharding(boolean goAheadAndDetermineSharding); +} http://git-wip-us.apache.org/repos/asf/beam/blob/ec6ccae9/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRegistrarTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRegistrarTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRegistrarTest.java index 603e43e..4b909bc 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRegistrarTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRegistrarTest.java @@ -37,7 +37,7 @@ public class DirectRegistrarTest { @Test public void testCorrectOptionsAreReturned() { assertEquals( - ImmutableList.of(DirectOptions.class), + ImmutableList.of(DirectOptions.class, DirectTestOptions.class), new Options().getPipelineOptions()); } http://git-wip-us.apache.org/repos/asf/beam/blob/ec6ccae9/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipelineOptions.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipelineOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipelineOptions.java index 904f3a2..206bc1f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipelineOptions.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipelineOptions.java @@ -20,10 +20,8 @@ package org.apache.beam.sdk.testing; import com.fasterxml.jackson.annotation.JsonIgnore; import javax.annotation.Nullable; import org.apache.beam.sdk.PipelineResult; -import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.DefaultValueFactory; -import org.apache.beam.sdk.options.Hidden; import org.apache.beam.sdk.options.PipelineOptions; import org.hamcrest.BaseMatcher; import org.hamcrest.Description; @@ -52,14 +50,6 @@ public interface TestPipelineOptions extends PipelineOptions { Long getTestTimeoutSeconds(); void setTestTimeoutSeconds(Long value); - @Default.Boolean(false) - @Internal - @Hidden - @org.apache.beam.sdk.options.Description( - "Indicates whether this is an automatically-run unit test.") - boolean isUnitTest(); - void setUnitTest(boolean unitTest); - /** * Factory for {@link PipelineResult} matchers which always pass. */