This is an automated email from the ASF dual-hosted git repository. scott pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new b7035c1 [BEAM-6225] Setup Jenkins Job to Run VR with ExecutableStage (#7271) b7035c1 is described below commit b7035c1a098c526356c6fb33480b989ed037da0a Author: Boyuan Zhang <36090911+boyua...@users.noreply.github.com> AuthorDate: Fri Dec 14 13:47:48 2018 -0800 [BEAM-6225] Setup Jenkins Job to Run VR with ExecutableStage (#7271) --- ...unner_DataflowPortabilityExecutableStage.groovy | 54 ++++++++++++++++++++++ runners/google-cloud-dataflow-java/build.gradle | 48 +++++++++++++++++-- ...aflowPortabilityExecutableStageUnsupported.java | 25 ++++++++++ .../apache/beam/sdk/testing/UsesSideInputs.java | 24 ++++++++++ .../org/apache/beam/sdk/testing/PAssertTest.java | 12 ++--- .../apache/beam/sdk/transforms/CombineTest.java | 30 ++++++------ .../apache/beam/sdk/transforms/FlattenTest.java | 3 +- .../apache/beam/sdk/transforms/GroupByKeyTest.java | 11 +++-- .../org/apache/beam/sdk/transforms/ParDoTest.java | 17 +++---- .../apache/beam/sdk/transforms/ReshuffleTest.java | 11 +++-- .../beam/sdk/transforms/SplittableDoFnTest.java | 10 +++- .../beam/sdk/transforms/join/CoGroupByKeyTest.java | 8 ++-- .../beam/sdk/transforms/windowing/WindowTest.java | 5 +- .../sdk/transforms/windowing/WindowingTest.java | 9 ++-- 14 files changed, 212 insertions(+), 55 deletions(-) diff --git a/.test-infra/jenkins/job_PostCommit_Java_ValidatesRunner_DataflowPortabilityExecutableStage.groovy b/.test-infra/jenkins/job_PostCommit_Java_ValidatesRunner_DataflowPortabilityExecutableStage.groovy new file mode 100644 index 0000000..62e7361 --- /dev/null +++ b/.test-infra/jenkins/job_PostCommit_Java_ValidatesRunner_DataflowPortabilityExecutableStage.groovy @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import CommonJobProperties as commonJobProperties +import PostcommitJobBuilder + + +// This job runs the suite of ValidatesRunner tests against the Dataflow +// runner. +PostcommitJobBuilder.postCommitJob('beam_PostCommit_Java_ValidatesRunner_DataflowPortabilityExecutableStage', + 'Run Dataflow Portability ExecutableStage ValidatesRunner', 'Google Cloud Dataflow Runner PortabilityApi ExecutableStage ValidatesRunner Tests', this) { + + description('Runs the ValidatesRunner suite on the Dataflow PortabilityApi runner with ExecutableStage code path enabled.') + + // Set common parameters. Sets a 3 hour timeout. + commonJobProperties.setTopLevelMainJobProperties(delegate, 'master', 400) + + // Publish all test results to Jenkins + publishers { + archiveJunit('**/build/test-results/**/*.xml') + } + + // Gradle goals for this job. + steps { + gradle { + rootBuildScriptDir(commonJobProperties.checkoutDir) + tasks(':beam-runners-google-cloud-dataflow-java:validatesRunnerFnApiWorkerExecutableStageTest') + // Increase parallel worker threads above processor limit since most time is + // spent waiting on Dataflow jobs. ValidatesRunner tests on Dataflow are slow + // because each one launches a Dataflow job with about 3 mins of overhead. + // 3 x num_cores strikes a good balance between maxing out parallelism without + // overloading the machines. + commonJobProperties.setGradleSwitches(delegate, 3 * Runtime.runtime.availableProcessors()) + } + } + + // [BEAM-6236] "use_executable_stage_bundle_execution" hasn't been rolled out. + disabled() +} diff --git a/runners/google-cloud-dataflow-java/build.gradle b/runners/google-cloud-dataflow-java/build.gradle index c0b831c..9c6aaf4 100644 --- a/runners/google-cloud-dataflow-java/build.gradle +++ b/runners/google-cloud-dataflow-java/build.gradle @@ -241,17 +241,55 @@ task validatesRunnerFnApiWorkerTest(type: Test) { } } +task validatesRunnerFnApiWorkerExecutableStageTest(type: Test) { + group = "Verification" + description "Validates Dataflow PortabilityApi runner" + dependsOn ":beam-runners-google-cloud-dataflow-java-fn-api-worker:shadowJar" + dependsOn buildAndPushDockerContainer + + systemProperty "beamTestPipelineOptions", JsonOutput.toJson([ + "--runner=TestDataflowRunner", + "--project=${dataflowProject}", + "--tempRoot=${dataflowPostCommitTempRoot}", + "--dataflowWorkerJar=${dataflowFnApiWorkerJar}", + "--workerHarnessContainerImage=${dockerImageContainer}:${dockerTag}", + "--experiments=beam_fn_api,use_executable_stage_bundle_execution"] + ) + + // Increase test parallelism up to the number of Gradle workers. By default this is equal + // to the number of CPU cores, but can be increased by setting --max-workers=N. + maxParallelForks Integer.MAX_VALUE + classpath = configurations.validatesRunner + testClassesDirs = files(project(":beam-sdks-java-core").sourceSets.test.output.classesDirs) + // TODO(BEAM-6232): ViewTest tests sideinputs, which is not supported bu current bundle execution. + exclude '**/ViewTest.class' + useJUnit { + includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner' + commonExcludeCategories.each { + excludeCategories it + } + fnApiWorkerExcludeCategories.each { + excludeCategories it + } + // TODO(BEAM-6232): Support sideinput. + excludeCategories 'org.apache.beam.sdk.testing.UsesSideInputs' + // TODO(BEAM-6233): Support timer and state. + excludeCategories 'org.apache.beam.sdk.testing.UsesStatefulParDo' + // TODO(BEAM-6231): Triage failures. + excludeCategories 'org.apache.beam.sdk.testing.DataflowPortabilityExecutableStageUnsupported' + } +} + task validatesRunner { group = "Verification" description "Validates Dataflow runner" dependsOn validatesRunnerLegacyWorkerTest } -task validatesRunnerPortabilityApi { - group = "Verification" - description "Validates Dataflow PortabilityApi runner" - dependsOn validatesRunnerFnApiWorkerTest - dependsOn buildAndPushDockerContainer +task validatesRunnerPortabilityApiExecutableStage { + group = "Verification" + description "Validates Dataflow PortabilityApi runner" + dependsOn validatesRunnerFnApiWorkerExecutableStageTest } task googleCloudPlatformLegacyWorkerIntegrationTest(type: Test) { diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/DataflowPortabilityExecutableStageUnsupported.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/DataflowPortabilityExecutableStageUnsupported.java new file mode 100644 index 0000000..a14885f --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/DataflowPortabilityExecutableStageUnsupported.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.testing; + +/** + * Category tag for validation tests which are not supported by Dataflow portable worker with + * use_exetuable_stage_bundle_execution, which needs more investigations. + */ +// TODO(BEAM-6231): Triage test failures introduced by using ExecutableStage. +public interface DataflowPortabilityExecutableStageUnsupported {} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesSideInputs.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesSideInputs.java new file mode 100644 index 0000000..907c13e --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesSideInputs.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.testing; + +/** + * Category tag for validation tests which use sideinputs. Tests tagged with {@link UsesSideInputs} + * should be run for runners which support sideinputs. + */ +public interface UsesSideInputs {} diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java index bf7ee69..160f709 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java @@ -297,7 +297,7 @@ public class PAssertTest implements Serializable { /** Basic test for {@code isEqualTo}. */ @Test - @Category(ValidatesRunner.class) + @Category({ValidatesRunner.class, UsesSideInputs.class}) public void testIsEqualTo() throws Exception { PCollection<Integer> pcollection = pipeline.apply(Create.of(43)); PAssert.thatSingleton(pcollection).isEqualTo(43); @@ -306,7 +306,7 @@ public class PAssertTest implements Serializable { /** Basic test for {@code isEqualTo}. */ @Test - @Category(ValidatesRunner.class) + @Category({ValidatesRunner.class, DataflowPortabilityExecutableStageUnsupported.class}) public void testWindowedIsEqualTo() throws Exception { PCollection<Integer> pcollection = pipeline @@ -326,7 +326,7 @@ public class PAssertTest implements Serializable { /** Basic test for {@code notEqualTo}. */ @Test - @Category(ValidatesRunner.class) + @Category({ValidatesRunner.class, UsesSideInputs.class}) public void testNotEqualTo() throws Exception { PCollection<Integer> pcollection = pipeline.apply(Create.of(43)); PAssert.thatSingleton(pcollection).notEqualTo(42); @@ -335,7 +335,7 @@ public class PAssertTest implements Serializable { /** Test that we throw an error for false assertion on singleton. */ @Test - @Category({ValidatesRunner.class, UsesFailureMessage.class}) + @Category({ValidatesRunner.class, UsesFailureMessage.class, UsesSideInputs.class}) public void testPAssertEqualsSingletonFalse() throws Exception { PCollection<Integer> pcollection = pipeline.apply(Create.of(42)); PAssert.thatSingleton("The value was not equal to 44", pcollection).isEqualTo(44); @@ -351,7 +351,7 @@ public class PAssertTest implements Serializable { /** Test that we throw an error for false assertion on singleton. */ @Test - @Category({ValidatesRunner.class, UsesFailureMessage.class}) + @Category({ValidatesRunner.class, UsesFailureMessage.class, UsesSideInputs.class}) public void testPAssertEqualsSingletonFalseDefaultReasonString() throws Exception { PCollection<Integer> pcollection = pipeline.apply(Create.of(42)); PAssert.thatSingleton(pcollection).isEqualTo(44); @@ -385,7 +385,7 @@ public class PAssertTest implements Serializable { /** Tests that windowed {@code containsInAnyOrder} is actually order-independent. */ @Test - @Category(ValidatesRunner.class) + @Category({ValidatesRunner.class, DataflowPortabilityExecutableStageUnsupported.class}) public void testWindowedContainsInAnyOrder() throws Exception { PCollection<Integer> pcollection = pipeline diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java index 2d825dc..21f7cda 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java @@ -58,9 +58,11 @@ import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.testing.DataflowPortabilityApiUnsupported; +import org.apache.beam.sdk.testing.DataflowPortabilityExecutableStageUnsupported; import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.testing.UsesSideInputs; import org.apache.beam.sdk.testing.ValidatesRunner; import org.apache.beam.sdk.transforms.Combine.CombineFn; import org.apache.beam.sdk.transforms.CombineTest.SharedTestBase.TestCombineFn.Accumulator; @@ -662,7 +664,7 @@ public class CombineTest implements Serializable { } @Test - @Category(ValidatesRunner.class) + @Category({ValidatesRunner.class, UsesSideInputs.class}) public void testBasicCombine() { runTestBasicCombine( Arrays.asList(KV.of("a", 1), KV.of("a", 1), KV.of("a", 4), KV.of("b", 1), KV.of("b", 13)), @@ -673,7 +675,7 @@ public class CombineTest implements Serializable { } @Test - @Category(ValidatesRunner.class) + @Category({ValidatesRunner.class, UsesSideInputs.class}) public void testBasicCombineEmpty() { runTestBasicCombine(EMPTY_TABLE, ImmutableSet.of(), Collections.emptyList()); } @@ -950,7 +952,7 @@ public class CombineTest implements Serializable { @RunWith(JUnit4.class) public static class CombineWithContextTests extends SharedTestBase { @Test - @Category(ValidatesRunner.class) + @Category({ValidatesRunner.class, UsesSideInputs.class}) @SuppressWarnings({"rawtypes", "unchecked"}) public void testSimpleCombineWithContext() { runTestSimpleCombineWithContext( @@ -961,7 +963,7 @@ public class CombineTest implements Serializable { } @Test - @Category(ValidatesRunner.class) + @Category({ValidatesRunner.class, UsesSideInputs.class}) public void testSimpleCombineWithContextEmpty() { runTestSimpleCombineWithContext(EMPTY_TABLE, 0, Collections.emptyList(), new String[] {}); } @@ -1022,7 +1024,7 @@ public class CombineTest implements Serializable { } @Test - @Category(ValidatesRunner.class) + @Category({ValidatesRunner.class, DataflowPortabilityExecutableStageUnsupported.class}) public void testFixedWindowsCombineWithContext() { PCollection<KV<String, Integer>> perKeyInput = pipeline @@ -1064,7 +1066,7 @@ public class CombineTest implements Serializable { } @Test - @Category(ValidatesRunner.class) + @Category({ValidatesRunner.class, UsesSideInputs.class}) public void testSlidingWindowsCombine() { PCollection<String> input = pipeline @@ -1123,7 +1125,7 @@ public class CombineTest implements Serializable { } @Test - @Category(ValidatesRunner.class) + @Category({ValidatesRunner.class, DataflowPortabilityExecutableStageUnsupported.class}) public void testSlidingWindowsCombineWithContext() { // [a: 1, 1], [a: 4; b: 1], [b: 13] PCollection<KV<String, Integer>> perKeyInput = @@ -1174,7 +1176,7 @@ public class CombineTest implements Serializable { } @Test - @Category(ValidatesRunner.class) + @Category({ValidatesRunner.class, UsesSideInputs.class}) public void testGlobalCombineWithDefaultsAndTriggers() { PCollection<Integer> input = pipeline.apply(Create.of(1, 1)); @@ -1201,7 +1203,7 @@ public class CombineTest implements Serializable { } @Test - @Category(ValidatesRunner.class) + @Category({ValidatesRunner.class, DataflowPortabilityExecutableStageUnsupported.class}) public void testSessionsCombine() { PCollection<KV<String, Integer>> input = pipeline @@ -1227,7 +1229,7 @@ public class CombineTest implements Serializable { } @Test - @Category(ValidatesRunner.class) + @Category({ValidatesRunner.class, DataflowPortabilityExecutableStageUnsupported.class}) public void testSessionsCombineWithContext() { PCollection<KV<String, Integer>> perKeyInput = pipeline.apply( @@ -1292,7 +1294,7 @@ public class CombineTest implements Serializable { } @Test - @Category(ValidatesRunner.class) + @Category({ValidatesRunner.class, UsesSideInputs.class}) public void testCombineGloballyAsSingletonView() { final PCollectionView<Integer> view = pipeline @@ -1318,7 +1320,7 @@ public class CombineTest implements Serializable { } @Test - @Category(ValidatesRunner.class) + @Category({ValidatesRunner.class, DataflowPortabilityExecutableStageUnsupported.class}) public void testWindowedCombineGloballyAsSingletonView() { FixedWindows windowFn = FixedWindows.of(Duration.standardMinutes(1)); final PCollectionView<Integer> view = @@ -1362,7 +1364,7 @@ public class CombineTest implements Serializable { /** Tests creation of a global {@link Combine} via Java 8 lambda. */ @Test - @Category(ValidatesRunner.class) + @Category({ValidatesRunner.class, UsesSideInputs.class}) public void testCombineGloballyLambda() { PCollection<Integer> output = @@ -1384,7 +1386,7 @@ public class CombineTest implements Serializable { /** Tests creation of a global {@link Combine} via a Java 8 method reference. */ @Test - @Category(ValidatesRunner.class) + @Category({ValidatesRunner.class, UsesSideInputs.class}) public void testCombineGloballyInstanceMethodReference() { PCollection<Integer> output = diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java index aa25136..693598b 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java @@ -50,6 +50,7 @@ import org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders; import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.testing.UsesSideInputs; import org.apache.beam.sdk.testing.ValidatesRunner; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.Sessions; @@ -184,7 +185,7 @@ public class FlattenTest implements Serializable { } @Test - @Category(ValidatesRunner.class) + @Category({ValidatesRunner.class, UsesSideInputs.class}) public void testEmptyFlattenAsSideInput() { final PCollectionView<Iterable<String>> view = PCollectionList.<String>empty(p) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java index 2049e10..c6bd909 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java @@ -49,6 +49,7 @@ import org.apache.beam.sdk.coders.MapCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.testing.DataflowPortabilityApiUnsupported; +import org.apache.beam.sdk.testing.DataflowPortabilityExecutableStageUnsupported; import org.apache.beam.sdk.testing.LargeKeys; import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; @@ -254,7 +255,7 @@ public class GroupByKeyTest implements Serializable { * two values. */ @Test - @Category(ValidatesRunner.class) + @Category({ValidatesRunner.class, DataflowPortabilityExecutableStageUnsupported.class}) public void testTimestampCombinerEarliest() { p.apply( @@ -275,7 +276,7 @@ public class GroupByKeyTest implements Serializable { * the windowing function customized to use the latest value. */ @Test - @Category(ValidatesRunner.class) + @Category({ValidatesRunner.class, DataflowPortabilityExecutableStageUnsupported.class}) public void testTimestampCombinerLatest() { p.apply( Create.timestamped( @@ -382,7 +383,7 @@ public class GroupByKeyTest implements Serializable { @RunWith(JUnit4.class) public static class WindowTests extends SharedTestBase { @Test - @Category(ValidatesRunner.class) + @Category({ValidatesRunner.class, DataflowPortabilityExecutableStageUnsupported.class}) public void testGroupByKeyAndWindows() { List<KV<String, Integer>> ungroupedPairs = Arrays.asList( @@ -423,7 +424,7 @@ public class GroupByKeyTest implements Serializable { } @Test - @Category(ValidatesRunner.class) + @Category({ValidatesRunner.class, DataflowPortabilityExecutableStageUnsupported.class}) public void testGroupByKeyMultipleWindows() { PCollection<KV<String, Integer>> windowedInput = p.apply( @@ -453,7 +454,7 @@ public class GroupByKeyTest implements Serializable { } @Test - @Category(ValidatesRunner.class) + @Category({ValidatesRunner.class, DataflowPortabilityExecutableStageUnsupported.class}) public void testGroupByKeyMergingWindows() { PCollection<KV<String, Integer>> windowedInput = p.apply( diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java index dab9f13..3264bde 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java @@ -82,12 +82,14 @@ import org.apache.beam.sdk.state.TimerSpec; import org.apache.beam.sdk.state.TimerSpecs; import org.apache.beam.sdk.state.ValueState; import org.apache.beam.sdk.testing.DataflowPortabilityApiUnsupported; +import org.apache.beam.sdk.testing.DataflowPortabilityExecutableStageUnsupported; import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.testing.TestStream; import org.apache.beam.sdk.testing.UsesMapState; import org.apache.beam.sdk.testing.UsesSetState; +import org.apache.beam.sdk.testing.UsesSideInputs; import org.apache.beam.sdk.testing.UsesStatefulParDo; import org.apache.beam.sdk.testing.UsesTestStream; import org.apache.beam.sdk.testing.UsesTimersInParDo; @@ -342,7 +344,6 @@ public class ParDoTest implements Serializable { pipeline.apply(Create.of(inputs)).apply(ParDo.of(new TestDoFn())); PAssert.that(output).satisfies(ParDoTest.HasExpectedOutput.forInput(inputs)); - pipeline.run(); } @@ -706,7 +707,7 @@ public class ParDoTest implements Serializable { } @Test - @Category(ValidatesRunner.class) + @Category({ValidatesRunner.class, UsesSideInputs.class}) public void testParDoWithSideInputs() { List<Integer> inputs = Arrays.asList(3, -42, 666); @@ -738,7 +739,7 @@ public class ParDoTest implements Serializable { } @Test - @Category(ValidatesRunner.class) + @Category({ValidatesRunner.class, UsesSideInputs.class}) public void testParDoWithSideInputsIsCumulative() { List<Integer> inputs = Arrays.asList(3, -42, 666); @@ -772,7 +773,7 @@ public class ParDoTest implements Serializable { } @Test - @Category(ValidatesRunner.class) + @Category({ValidatesRunner.class, UsesSideInputs.class}) public void testMultiOutputParDoWithSideInputs() { List<Integer> inputs = Arrays.asList(3, -42, 666); @@ -810,7 +811,7 @@ public class ParDoTest implements Serializable { } @Test - @Category(ValidatesRunner.class) + @Category({ValidatesRunner.class, UsesSideInputs.class}) public void testMultiOutputParDoWithSideInputsIsCumulative() { List<Integer> inputs = Arrays.asList(3, -42, 666); @@ -866,7 +867,7 @@ public class ParDoTest implements Serializable { } @Test - @Category(ValidatesRunner.class) + @Category({ValidatesRunner.class, UsesSideInputs.class}) public void testSideInputsWithMultipleWindows() { // Tests that the runner can safely run a DoFn that uses side inputs // on an input where the element is in multiple windows. The complication is @@ -1186,7 +1187,7 @@ public class ParDoTest implements Serializable { } @Test - @Category(ValidatesRunner.class) + @Category({ValidatesRunner.class, DataflowPortabilityExecutableStageUnsupported.class}) public void testWindowingInStartAndFinishBundle() { final FixedWindows windowFn = FixedWindows.of(Duration.millis(1)); @@ -1961,7 +1962,7 @@ public class ParDoTest implements Serializable { } @Test - @Category({ValidatesRunner.class, UsesStatefulParDo.class}) + @Category({ValidatesRunner.class, UsesStatefulParDo.class, UsesSideInputs.class}) public void testBagStateSideInput() { final PCollectionView<List<Integer>> listView = diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ReshuffleTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ReshuffleTest.java index 6c2c94b..5d9c7f2 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ReshuffleTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ReshuffleTest.java @@ -30,6 +30,7 @@ import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.testing.DataflowPortabilityExecutableStageUnsupported; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.testing.TestStream; @@ -149,7 +150,7 @@ public class ReshuffleTest implements Serializable { } @Test - @Category(ValidatesRunner.class) + @Category({ValidatesRunner.class, DataflowPortabilityExecutableStageUnsupported.class}) public void testReshuffleAfterSessionsAndGroupByKey() { PCollection<KV<String, Iterable<Integer>>> input = @@ -170,7 +171,7 @@ public class ReshuffleTest implements Serializable { } @Test - @Category(ValidatesRunner.class) + @Category({ValidatesRunner.class, DataflowPortabilityExecutableStageUnsupported.class}) public void testReshuffleAfterFixedWindowsAndGroupByKey() { PCollection<KV<String, Iterable<Integer>>> input = @@ -191,7 +192,7 @@ public class ReshuffleTest implements Serializable { } @Test - @Category(ValidatesRunner.class) + @Category({ValidatesRunner.class, DataflowPortabilityExecutableStageUnsupported.class}) public void testReshuffleAfterSlidingWindowsAndGroupByKey() { PCollection<KV<String, Iterable<Integer>>> input = @@ -212,7 +213,7 @@ public class ReshuffleTest implements Serializable { } @Test - @Category(ValidatesRunner.class) + @Category({ValidatesRunner.class, DataflowPortabilityExecutableStageUnsupported.class}) public void testReshuffleAfterFixedWindows() { PCollection<KV<String, Integer>> input = @@ -232,7 +233,7 @@ public class ReshuffleTest implements Serializable { } @Test - @Category(ValidatesRunner.class) + @Category({ValidatesRunner.class, DataflowPortabilityExecutableStageUnsupported.class}) public void testReshuffleAfterSlidingWindows() { PCollection<KV<String, Integer>> input = diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java index 08a26cb..01e7335 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java @@ -34,12 +34,14 @@ import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.range.OffsetRange; import org.apache.beam.sdk.testing.DataflowPortabilityApiUnsupported; +import org.apache.beam.sdk.testing.DataflowPortabilityExecutableStageUnsupported; import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.testing.TestStream; import org.apache.beam.sdk.testing.UsesBoundedSplittableParDo; import org.apache.beam.sdk.testing.UsesParDoLifecycle; +import org.apache.beam.sdk.testing.UsesSideInputs; import org.apache.beam.sdk.testing.UsesSplittableParDoWithWindowedSideInputs; import org.apache.beam.sdk.testing.UsesTestStream; import org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo; @@ -152,7 +154,11 @@ public class SplittableDoFnTest implements Serializable { } @Test - @Category({ValidatesRunner.class, UsesBoundedSplittableParDo.class}) + @Category({ + ValidatesRunner.class, + UsesBoundedSplittableParDo.class, + DataflowPortabilityExecutableStageUnsupported.class + }) public void testPairWithIndexWindowedTimestampedBounded() { testPairWithIndexWindowedTimestamped(IsBounded.BOUNDED); } @@ -351,7 +357,7 @@ public class SplittableDoFnTest implements Serializable { } @Test - @Category({ValidatesRunner.class, UsesBoundedSplittableParDo.class}) + @Category({ValidatesRunner.class, UsesBoundedSplittableParDo.class, UsesSideInputs.class}) public void testSideInputBounded() { testSideInput(IsBounded.BOUNDED); } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java index ff476e3..002bc4b 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java @@ -32,9 +32,11 @@ import org.apache.beam.sdk.coders.BigEndianIntegerCoder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.testing.DataflowPortabilityExecutableStageUnsupported; import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.testing.UsesSideInputs; import org.apache.beam.sdk.testing.ValidatesRunner; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; @@ -118,7 +120,7 @@ public class CoGroupByKeyTest implements Serializable { @Rule public final transient TestPipeline p = TestPipeline.create(); @Test - @Category(ValidatesRunner.class) + @Category({ValidatesRunner.class, UsesSideInputs.class}) public void testCoGroupByKeyGetOnly() { final TupleTag<String> tag1 = new TupleTag<>(); final TupleTag<String> tag2 = new TupleTag<>(); @@ -248,7 +250,7 @@ public class CoGroupByKeyTest implements Serializable { } @Test - @Category(ValidatesRunner.class) + @Category({ValidatesRunner.class, UsesSideInputs.class}) public void testCoGroupByKey() { final TupleTag<String> namesTag = new TupleTag<>(); final TupleTag<String> addressesTag = new TupleTag<>(); @@ -468,7 +470,7 @@ public class CoGroupByKeyTest implements Serializable { /** Tests the pipeline end-to-end with FixedWindows. */ @SuppressWarnings("unchecked") @Test - @Category(ValidatesRunner.class) + @Category({ValidatesRunner.class, DataflowPortabilityExecutableStageUnsupported.class}) public void testCoGroupByKeyWithWindowing() { TupleTag<String> clicksTag = new TupleTag<>(); TupleTag<String> purchasesTag = new TupleTag<>(); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java index 6b1be69..5345213 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java @@ -52,6 +52,7 @@ import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.io.GenerateSequence; import org.apache.beam.sdk.runners.TransformHierarchy; import org.apache.beam.sdk.testing.DataflowPortabilityApiUnsupported; +import org.apache.beam.sdk.testing.DataflowPortabilityExecutableStageUnsupported; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.testing.UsesCustomWindowMerging; @@ -439,7 +440,7 @@ public class WindowTest implements Serializable { * the windowing function default, the end of the window. */ @Test - @Category(ValidatesRunner.class) + @Category({ValidatesRunner.class, DataflowPortabilityExecutableStageUnsupported.class}) public void testTimestampCombinerDefault() { pipeline.enableAbandonedNodeEnforcement(true); @@ -473,7 +474,7 @@ public class WindowTest implements Serializable { * the windowing function customized to use the end of the window. */ @Test - @Category(ValidatesRunner.class) + @Category({ValidatesRunner.class, DataflowPortabilityExecutableStageUnsupported.class}) public void testTimestampCombinerEndOfWindow() { pipeline.enableAbandonedNodeEnforcement(true); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java index 88bf613..8b8faf8 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java @@ -25,6 +25,7 @@ import java.io.Serializable; import java.util.List; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.testing.DataflowPortabilityExecutableStageUnsupported; import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; @@ -103,7 +104,7 @@ public class WindowingTest implements Serializable { } @Test - @Category(ValidatesRunner.class) + @Category({ValidatesRunner.class, DataflowPortabilityExecutableStageUnsupported.class}) public void testPartitioningWindowing() { PCollection<String> input = p.apply( @@ -127,7 +128,7 @@ public class WindowingTest implements Serializable { } @Test - @Category(ValidatesRunner.class) + @Category({ValidatesRunner.class, DataflowPortabilityExecutableStageUnsupported.class}) public void testNonPartitioningWindowing() { PCollection<String> input = p.apply( @@ -151,7 +152,7 @@ public class WindowingTest implements Serializable { } @Test - @Category(ValidatesRunner.class) + @Category({ValidatesRunner.class, DataflowPortabilityExecutableStageUnsupported.class}) public void testMergingWindowing() { PCollection<String> input = p.apply( @@ -169,7 +170,7 @@ public class WindowingTest implements Serializable { } @Test - @Category(ValidatesRunner.class) + @Category({ValidatesRunner.class, DataflowPortabilityExecutableStageUnsupported.class}) public void testWindowPreservation() { PCollection<String> input1 = p.apply(