Repository: beam Updated Branches: refs/heads/master 3f0fe91aa -> faa9645d2
[BEAM-1752, BEAM-1582] execute tests that recover from checkpoint in post-commit. Add UsesCheckpointRecovery tag to mark Spark runner tests that recover from checkpoint. Mark ResumeFromCheckpointStreamingTest with UsesCheckpointRecovery and execute on PostCommit as part of the Spark runner ROS profile. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/158e10ec Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/158e10ec Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/158e10ec Branch: refs/heads/master Commit: 158e10ecf21cf3f2f18699454e8315195316a17c Parents: 3f0fe91 Author: Amit Sela <amitsel...@gmail.com> Authored: Sun Mar 19 15:01:55 2017 +0200 Committer: Amit Sela <amitsel...@gmail.com> Committed: Sun Mar 19 19:55:17 2017 +0200 ---------------------------------------------------------------------- runners/spark/pom.xml | 8 ++++++- .../runners/spark/UsesCheckpointRecovery.java | 23 ++++++++++++++++++++ .../ResumeFromCheckpointStreamingTest.java | 3 +++ 3 files changed, 33 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/158e10ec/runners/spark/pom.xml ---------------------------------------------------------------------- diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml index 10fa946..6958b28 100644 --- a/runners/spark/pom.xml +++ b/runners/spark/pom.xml @@ -72,7 +72,10 @@ <goal>test</goal> </goals> <configuration> - <groups>org.apache.beam.sdk.testing.RunnableOnService</groups> + <groups> + org.apache.beam.sdk.testing.RunnableOnService, + org.apache.beam.runners.spark.UsesCheckpointRecovery + </groups> <excludedGroups> org.apache.beam.sdk.testing.UsesStatefulParDo, org.apache.beam.sdk.testing.UsesTimersInParDo, @@ -349,6 +352,9 @@ <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-surefire-plugin</artifactId> <configuration> + <excludedGroups> + org.apache.beam.runners.spark.UsesCheckpointRecovery + </excludedGroups> <forkCount>1</forkCount> <reuseForks>false</reuseForks> <systemPropertyVariables> http://git-wip-us.apache.org/repos/asf/beam/blob/158e10ec/runners/spark/src/test/java/org/apache/beam/runners/spark/UsesCheckpointRecovery.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/UsesCheckpointRecovery.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/UsesCheckpointRecovery.java new file mode 100644 index 0000000..da63d3e --- /dev/null +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/UsesCheckpointRecovery.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.spark; + +/** + * Category tag for tests that validate Spark checkpoint recovery. + */ +public interface UsesCheckpointRecovery {} http://git-wip-us.apache.org/repos/asf/beam/blob/158e10ec/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java index ce502d6..5c1963d 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java @@ -38,6 +38,7 @@ import org.apache.beam.runners.spark.PipelineRule; import org.apache.beam.runners.spark.ReuseSparkContextRule; import org.apache.beam.runners.spark.SparkPipelineResult; import org.apache.beam.runners.spark.TestSparkPipelineOptions; +import org.apache.beam.runners.spark.UsesCheckpointRecovery; import org.apache.beam.runners.spark.aggregators.AggregatorsAccumulator; import org.apache.beam.runners.spark.coders.CoderHelpers; import org.apache.beam.runners.spark.metrics.MetricsAccumulator; @@ -82,6 +83,7 @@ import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; +import org.junit.experimental.categories.Category; /** @@ -140,6 +142,7 @@ public class ResumeFromCheckpointStreamingTest { } @Test + @Category(UsesCheckpointRecovery.class) public void testWithResume() throws Exception { // write to Kafka produce(ImmutableMap.of(