[BEAM-889] Let Spark handle the user-provided checkpointDir, but warn if not a reliable fs.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/90a75d1f Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/90a75d1f Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/90a75d1f Branch: refs/heads/apex-runner Commit: 90a75d1fb0706ec4cc25a9eeeca8ade1b3b7de28 Parents: 46fbfe0 Author: Sela <ans...@paypal.com> Authored: Thu Nov 3 18:22:20 2016 +0200 Committer: Sela <ans...@paypal.com> Committed: Fri Nov 4 23:59:40 2016 +0200 ---------------------------------------------------------------------- .../runners/spark/SparkPipelineOptions.java | 3 +-- .../SparkRunnerStreamingContextFactory.java | 23 +++++--------------- .../streaming/EmptyStreamAssertionTest.java | 3 +-- .../streaming/FlattenStreamingTest.java | 6 ++--- .../streaming/KafkaStreamingTest.java | 6 ++--- .../ResumeFromCheckpointStreamingTest.java | 3 +-- .../streaming/SimpleStreamingWordCountTest.java | 3 +-- .../utils/TestOptionsForStreaming.java | 12 +++++----- 8 files changed, 19 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/90a75d1f/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java index 08e14fe..4eada35 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java @@ -77,8 +77,7 @@ public interface SparkPipelineOptions extends PipelineOptions, StreamingOptions, class TmpCheckpointDirFactory implements DefaultValueFactory<String> { @Override public String create(PipelineOptions options) { - SparkPipelineOptions sparkPipelineOptions = options.as(SparkPipelineOptions.class); - return "file:///tmp/" + sparkPipelineOptions.getJobName(); + return "/tmp/" + options.as(SparkPipelineOptions.class).getJobName(); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/90a75d1f/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java index 2378788..a670f61 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java @@ -20,11 +20,6 @@ package org.apache.beam.runners.spark.translation.streaming; import static com.google.common.base.Preconditions.checkArgument; -import com.google.common.base.Predicates; -import com.google.common.collect.Iterables; -import java.net.MalformedURLException; -import java.net.URL; -import java.util.Arrays; import org.apache.beam.runners.spark.SparkPipelineOptions; import org.apache.beam.runners.spark.SparkRunner; import org.apache.beam.runners.spark.translation.SparkContextFactory; @@ -48,7 +43,7 @@ import org.slf4j.LoggerFactory; public class SparkRunnerStreamingContextFactory implements JavaStreamingContextFactory { private static final Logger LOG = LoggerFactory.getLogger(SparkRunnerStreamingContextFactory.class); - private static final Iterable<String> KNOWN_RELIABLE_FS = Arrays.asList("hdfs", "s3", "gs"); + private static final String KNOWN_RELIABLE_FS_PATTERN = "^(hdfs|s3|gs)"; private final Pipeline pipeline; private final SparkPipelineOptions options; @@ -83,19 +78,11 @@ public class SparkRunnerStreamingContextFactory implements JavaStreamingContextF // set checkpoint dir. String checkpointDir = options.getCheckpointDir(); - LOG.info("Checkpoint dir set to: {}", checkpointDir); - try { - // validate checkpoint dir and warn if not of a known durable filesystem. - URL checkpointDirUrl = new URL(checkpointDir); - if (!Iterables.any(KNOWN_RELIABLE_FS, Predicates.equalTo(checkpointDirUrl.getProtocol()))) { - LOG.warn("Checkpoint dir URL {} does not match a reliable filesystem, in case of failures " - + "this job may not recover properly or even at all.", checkpointDirUrl); - } - } catch (MalformedURLException e) { - throw new RuntimeException("Failed to form checkpoint dir URL. CheckpointDir should be in " - + "the form of hdfs:///path/to/dir or other reliable fs protocol, " - + "or file:///path/to/dir for local mode.", e); + if (!checkpointDir.matches(KNOWN_RELIABLE_FS_PATTERN)) { + LOG.warn("The specified checkpoint dir {} does not match a reliable filesystem so in case " + + "of failures this job may not recover properly or even at all.", checkpointDir); } + LOG.info("Checkpoint dir set to: {}", checkpointDir); jssc.checkpoint(checkpointDir); // register listeners. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/90a75d1f/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/EmptyStreamAssertionTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/EmptyStreamAssertionTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/EmptyStreamAssertionTest.java index 4f2a7c6..3e95b4d 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/EmptyStreamAssertionTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/EmptyStreamAssertionTest.java @@ -55,8 +55,7 @@ public class EmptyStreamAssertionTest implements Serializable { @Test public void testAssertion() throws Exception { - SparkPipelineOptions options = commonOptions.withTmpCheckpointDir( - checkpointParentDir.newFolder(getClass().getSimpleName())); + SparkPipelineOptions options = commonOptions.withTmpCheckpointDir(checkpointParentDir); Duration windowDuration = new Duration(options.getBatchIntervalMillis()); Pipeline pipeline = Pipeline.create(options); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/90a75d1f/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java index e6872f1..319b5e9 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java @@ -61,8 +61,7 @@ public class FlattenStreamingTest { @Test public void testFlattenUnbounded() throws Exception { - SparkPipelineOptions options = commonOptions.withTmpCheckpointDir( - checkpointParentDir.newFolder(getClass().getSimpleName())); + SparkPipelineOptions options = commonOptions.withTmpCheckpointDir(checkpointParentDir); Pipeline p = Pipeline.create(options); PCollection<String> w1 = @@ -81,8 +80,7 @@ public class FlattenStreamingTest { @Test public void testFlattenBoundedUnbounded() throws Exception { - SparkPipelineOptions options = commonOptions.withTmpCheckpointDir( - checkpointParentDir.newFolder(getClass().getSimpleName())); + SparkPipelineOptions options = commonOptions.withTmpCheckpointDir(checkpointParentDir); Pipeline p = Pipeline.create(options); PCollection<String> w1 = http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/90a75d1f/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java index fe2d04e..f01059f 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java @@ -74,8 +74,7 @@ public class KafkaStreamingTest { @Test public void testEarliest2Topics() throws Exception { - SparkPipelineOptions options = commonOptions.withTmpCheckpointDir( - checkpointParentDir.newFolder(getClass().getSimpleName())); + SparkPipelineOptions options = commonOptions.withTmpCheckpointDir(checkpointParentDir); // It seems that the consumer's first "position" lookup (in unit test) takes +200 msec, // so to be on the safe side we'll set to 750 msec. options.setMinReadTimeMillis(750L); @@ -122,8 +121,7 @@ public class KafkaStreamingTest { @Test public void testLatest() throws Exception { - SparkPipelineOptions options = commonOptions.withTmpCheckpointDir( - checkpointParentDir.newFolder(getClass().getSimpleName())); + SparkPipelineOptions options = commonOptions.withTmpCheckpointDir(checkpointParentDir); //--- setup final String topic = "topic"; // messages. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/90a75d1f/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java index ca0b668..34ffbe2 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java @@ -108,8 +108,7 @@ public class ResumeFromCheckpointStreamingTest { @Test public void testRun() throws Exception { - SparkPipelineOptions options = commonOptions.withTmpCheckpointDir( - checkpointParentDir.newFolder(getClass().getSimpleName())); + SparkPipelineOptions options = commonOptions.withTmpCheckpointDir(checkpointParentDir); // It seems that the consumer's first "position" lookup (in unit test) takes +200 msec, // so to be on the safe side we'll set to 750 msec. options.setMinReadTimeMillis(750L); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/90a75d1f/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java index 4bc9a3d..edba507 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java @@ -62,8 +62,7 @@ public class SimpleStreamingWordCountTest implements Serializable { @Test public void testFixedWindows() throws Exception { - SparkPipelineOptions options = commonOptions.withTmpCheckpointDir( - checkpointParentDir.newFolder(getClass().getSimpleName())); + SparkPipelineOptions options = commonOptions.withTmpCheckpointDir(checkpointParentDir); // override defaults options.setBatchIntervalMillis(BATCH_INTERVAL.getMillis()); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/90a75d1f/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/TestOptionsForStreaming.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/TestOptionsForStreaming.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/TestOptionsForStreaming.java index d695df0..2861d9f 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/TestOptionsForStreaming.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/TestOptionsForStreaming.java @@ -19,12 +19,13 @@ package org.apache.beam.runners.spark.translation.streaming.utils; -import java.io.File; -import java.net.MalformedURLException; +import java.io.IOException; import org.apache.beam.runners.spark.SparkPipelineOptions; import org.apache.beam.runners.spark.SparkRunner; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.junit.rules.ExternalResource; +import org.junit.rules.TemporaryFolder; + /** @@ -41,11 +42,10 @@ public class TestOptionsForStreaming extends ExternalResource { options.setTimeout(1000L); } - public SparkPipelineOptions withTmpCheckpointDir(File checkpointDir) - throws MalformedURLException { + public SparkPipelineOptions withTmpCheckpointDir(TemporaryFolder parent) + throws IOException { // tests use JUnit's TemporaryFolder path in the form of: /.../junit/... - // so need to add the missing protocol. - options.setCheckpointDir(checkpointDir.toURI().toURL().toString()); + options.setCheckpointDir(parent.newFolder(options.getJobName()).toURI().toURL().toString()); return options; }