[BEAM-889] Let Spark handle the user-provided checkpointDir, but warn if not a 
reliable fs.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/90a75d1f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/90a75d1f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/90a75d1f

Branch: refs/heads/apex-runner
Commit: 90a75d1fb0706ec4cc25a9eeeca8ade1b3b7de28
Parents: 46fbfe0
Author: Sela <ans...@paypal.com>
Authored: Thu Nov 3 18:22:20 2016 +0200
Committer: Sela <ans...@paypal.com>
Committed: Fri Nov 4 23:59:40 2016 +0200

----------------------------------------------------------------------
 .../runners/spark/SparkPipelineOptions.java     |  3 +--
 .../SparkRunnerStreamingContextFactory.java     | 23 +++++---------------
 .../streaming/EmptyStreamAssertionTest.java     |  3 +--
 .../streaming/FlattenStreamingTest.java         |  6 ++---
 .../streaming/KafkaStreamingTest.java           |  6 ++---
 .../ResumeFromCheckpointStreamingTest.java      |  3 +--
 .../streaming/SimpleStreamingWordCountTest.java |  3 +--
 .../utils/TestOptionsForStreaming.java          | 12 +++++-----
 8 files changed, 19 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/90a75d1f/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
index 08e14fe..4eada35 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
@@ -77,8 +77,7 @@ public interface SparkPipelineOptions extends 
PipelineOptions, StreamingOptions,
   class TmpCheckpointDirFactory implements DefaultValueFactory<String> {
     @Override
     public String create(PipelineOptions options) {
-      SparkPipelineOptions sparkPipelineOptions = 
options.as(SparkPipelineOptions.class);
-      return "file:///tmp/" + sparkPipelineOptions.getJobName();
+      return "/tmp/" + options.as(SparkPipelineOptions.class).getJobName();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/90a75d1f/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
index 2378788..a670f61 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
@@ -20,11 +20,6 @@ package org.apache.beam.runners.spark.translation.streaming;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
-import com.google.common.base.Predicates;
-import com.google.common.collect.Iterables;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.util.Arrays;
 import org.apache.beam.runners.spark.SparkPipelineOptions;
 import org.apache.beam.runners.spark.SparkRunner;
 import org.apache.beam.runners.spark.translation.SparkContextFactory;
@@ -48,7 +43,7 @@ import org.slf4j.LoggerFactory;
 public class SparkRunnerStreamingContextFactory implements 
JavaStreamingContextFactory {
   private static final Logger LOG =
       LoggerFactory.getLogger(SparkRunnerStreamingContextFactory.class);
-  private static final Iterable<String> KNOWN_RELIABLE_FS = 
Arrays.asList("hdfs", "s3", "gs");
+  private static final String KNOWN_RELIABLE_FS_PATTERN = "^(hdfs|s3|gs)";
 
   private final Pipeline pipeline;
   private final SparkPipelineOptions options;
@@ -83,19 +78,11 @@ public class SparkRunnerStreamingContextFactory implements 
JavaStreamingContextF
 
     // set checkpoint dir.
     String checkpointDir = options.getCheckpointDir();
-    LOG.info("Checkpoint dir set to: {}", checkpointDir);
-    try {
-      // validate checkpoint dir and warn if not of a known durable filesystem.
-      URL checkpointDirUrl = new URL(checkpointDir);
-      if (!Iterables.any(KNOWN_RELIABLE_FS, 
Predicates.equalTo(checkpointDirUrl.getProtocol()))) {
-        LOG.warn("Checkpoint dir URL {} does not match a reliable filesystem, 
in case of failures "
-            + "this job may not recover properly or even at all.", 
checkpointDirUrl);
-      }
-    } catch (MalformedURLException e) {
-      throw new RuntimeException("Failed to form checkpoint dir URL. 
CheckpointDir should be in "
-          + "the form of hdfs:///path/to/dir or other reliable fs protocol, "
-              + "or file:///path/to/dir for local mode.", e);
+    if (!checkpointDir.matches(KNOWN_RELIABLE_FS_PATTERN)) {
+      LOG.warn("The specified checkpoint dir {} does not match a reliable 
filesystem so in case "
+          + "of failures this job may not recover properly or even at all.", 
checkpointDir);
     }
+    LOG.info("Checkpoint dir set to: {}", checkpointDir);
     jssc.checkpoint(checkpointDir);
 
     // register listeners.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/90a75d1f/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/EmptyStreamAssertionTest.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/EmptyStreamAssertionTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/EmptyStreamAssertionTest.java
index 4f2a7c6..3e95b4d 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/EmptyStreamAssertionTest.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/EmptyStreamAssertionTest.java
@@ -55,8 +55,7 @@ public class EmptyStreamAssertionTest implements Serializable 
{
 
   @Test
   public void testAssertion() throws Exception {
-    SparkPipelineOptions options = commonOptions.withTmpCheckpointDir(
-        checkpointParentDir.newFolder(getClass().getSimpleName()));
+    SparkPipelineOptions options = 
commonOptions.withTmpCheckpointDir(checkpointParentDir);
     Duration windowDuration = new Duration(options.getBatchIntervalMillis());
 
     Pipeline pipeline = Pipeline.create(options);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/90a75d1f/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java
index e6872f1..319b5e9 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java
@@ -61,8 +61,7 @@ public class FlattenStreamingTest {
 
   @Test
   public void testFlattenUnbounded() throws Exception {
-    SparkPipelineOptions options = commonOptions.withTmpCheckpointDir(
-        checkpointParentDir.newFolder(getClass().getSimpleName()));
+    SparkPipelineOptions options = 
commonOptions.withTmpCheckpointDir(checkpointParentDir);
 
     Pipeline p = Pipeline.create(options);
     PCollection<String> w1 =
@@ -81,8 +80,7 @@ public class FlattenStreamingTest {
 
   @Test
   public void testFlattenBoundedUnbounded() throws Exception {
-    SparkPipelineOptions options = commonOptions.withTmpCheckpointDir(
-        checkpointParentDir.newFolder(getClass().getSimpleName()));
+    SparkPipelineOptions options = 
commonOptions.withTmpCheckpointDir(checkpointParentDir);
 
     Pipeline p = Pipeline.create(options);
     PCollection<String> w1 =

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/90a75d1f/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
index fe2d04e..f01059f 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
@@ -74,8 +74,7 @@ public class KafkaStreamingTest {
 
   @Test
   public void testEarliest2Topics() throws Exception {
-    SparkPipelineOptions options = commonOptions.withTmpCheckpointDir(
-        checkpointParentDir.newFolder(getClass().getSimpleName()));
+    SparkPipelineOptions options = 
commonOptions.withTmpCheckpointDir(checkpointParentDir);
     // It seems that the consumer's first "position" lookup (in unit test) 
takes +200 msec,
     // so to be on the safe side we'll set to 750 msec.
     options.setMinReadTimeMillis(750L);
@@ -122,8 +121,7 @@ public class KafkaStreamingTest {
 
   @Test
   public void testLatest() throws Exception {
-    SparkPipelineOptions options = commonOptions.withTmpCheckpointDir(
-        checkpointParentDir.newFolder(getClass().getSimpleName()));
+    SparkPipelineOptions options = 
commonOptions.withTmpCheckpointDir(checkpointParentDir);
     //--- setup
     final String topic = "topic";
     // messages.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/90a75d1f/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
index ca0b668..34ffbe2 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
@@ -108,8 +108,7 @@ public class ResumeFromCheckpointStreamingTest {
 
   @Test
   public void testRun() throws Exception {
-    SparkPipelineOptions options = commonOptions.withTmpCheckpointDir(
-        checkpointParentDir.newFolder(getClass().getSimpleName()));
+    SparkPipelineOptions options = 
commonOptions.withTmpCheckpointDir(checkpointParentDir);
     // It seems that the consumer's first "position" lookup (in unit test) 
takes +200 msec,
     // so to be on the safe side we'll set to 750 msec.
     options.setMinReadTimeMillis(750L);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/90a75d1f/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
index 4bc9a3d..edba507 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
@@ -62,8 +62,7 @@ public class SimpleStreamingWordCountTest implements 
Serializable {
 
   @Test
   public void testFixedWindows() throws Exception {
-    SparkPipelineOptions options = commonOptions.withTmpCheckpointDir(
-        checkpointParentDir.newFolder(getClass().getSimpleName()));
+    SparkPipelineOptions options = 
commonOptions.withTmpCheckpointDir(checkpointParentDir);
 
     // override defaults
     options.setBatchIntervalMillis(BATCH_INTERVAL.getMillis());

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/90a75d1f/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/TestOptionsForStreaming.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/TestOptionsForStreaming.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/TestOptionsForStreaming.java
index d695df0..2861d9f 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/TestOptionsForStreaming.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/TestOptionsForStreaming.java
@@ -19,12 +19,13 @@
 package org.apache.beam.runners.spark.translation.streaming.utils;
 
 
-import java.io.File;
-import java.net.MalformedURLException;
+import java.io.IOException;
 import org.apache.beam.runners.spark.SparkPipelineOptions;
 import org.apache.beam.runners.spark.SparkRunner;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.junit.rules.ExternalResource;
+import org.junit.rules.TemporaryFolder;
+
 
 
 /**
@@ -41,11 +42,10 @@ public class TestOptionsForStreaming extends 
ExternalResource {
     options.setTimeout(1000L);
   }
 
-  public SparkPipelineOptions withTmpCheckpointDir(File checkpointDir)
-      throws MalformedURLException {
+  public SparkPipelineOptions withTmpCheckpointDir(TemporaryFolder parent)
+      throws IOException {
     // tests use JUnit's TemporaryFolder path in the form of: /.../junit/...
-    // so need to add the missing protocol.
-    options.setCheckpointDir(checkpointDir.toURI().toURL().toString());
+    
options.setCheckpointDir(parent.newFolder(options.getJobName()).toURI().toURL().toString());
     return options;
   }
 

Reply via email to