Repository: spark
Updated Branches:
  refs/heads/branch-1.5 00f812d38 -> b40059dbd


[SPARK-10142] [STREAMING] Made python checkpoint recovery handle non-local 
checkpoint paths and existing SparkContexts

The current code only checks checkpoint files in local filesystem, and always 
tries to create a new Python SparkContext (even if one already exists). The 
solution is to do the following:
1. Use the same code path as Java to check whether a valid checkpoint exists
2. Create a new Python SparkContext only if there no active one.

There is not test for the path as its hard to test with distributed filesystem 
paths in a local unit test. I am going to test it with a distributed file 
system manually to verify that this patch works.

Author: Tathagata Das <tathagata.das1...@gmail.com>

Closes #8366 from tdas/SPARK-10142 and squashes the following commits:

3afa666 [Tathagata Das] Added tests
2dd4ae5 [Tathagata Das] Added the check to not create a context if one already 
exists
9bf151b [Tathagata Das] Made python checkpoint recovery use java to find the 
checkpoint files

(cherry picked from commit 053d94fcf32268369b5a40837271f15d6af41aa4)
Signed-off-by: Tathagata Das <tathagata.das1...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b40059db
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b40059db
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b40059db

Branch: refs/heads/branch-1.5
Commit: b40059dbda4dafbb883a53fbd5c5f69bc01a3e19
Parents: 00f812d
Author: Tathagata Das <tathagata.das1...@gmail.com>
Authored: Sun Aug 23 19:24:32 2015 -0700
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Sun Aug 23 19:24:42 2015 -0700

----------------------------------------------------------------------
 python/pyspark/streaming/context.py             | 22 ++++++----
 python/pyspark/streaming/tests.py               | 43 ++++++++++++++++----
 .../org/apache/spark/streaming/Checkpoint.scala |  9 ++++
 3 files changed, 58 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b40059db/python/pyspark/streaming/context.py
----------------------------------------------------------------------
diff --git a/python/pyspark/streaming/context.py 
b/python/pyspark/streaming/context.py
index e3ba70e..4069d7a 100644
--- a/python/pyspark/streaming/context.py
+++ b/python/pyspark/streaming/context.py
@@ -150,26 +150,30 @@ class StreamingContext(object):
         @param checkpointPath: Checkpoint directory used in an earlier 
streaming program
         @param setupFunc:      Function to create a new context and setup 
DStreams
         """
-        # TODO: support checkpoint in HDFS
-        if not os.path.exists(checkpointPath) or not 
os.listdir(checkpointPath):
+        cls._ensure_initialized()
+        gw = SparkContext._gateway
+
+        # Check whether valid checkpoint information exists in the given path
+        if gw.jvm.CheckpointReader.read(checkpointPath).isEmpty():
             ssc = setupFunc()
             ssc.checkpoint(checkpointPath)
             return ssc
 
-        cls._ensure_initialized()
-        gw = SparkContext._gateway
-
         try:
             jssc = gw.jvm.JavaStreamingContext(checkpointPath)
         except Exception:
             print("failed to load StreamingContext from checkpoint", 
file=sys.stderr)
             raise
 
-        jsc = jssc.sparkContext()
-        conf = SparkConf(_jconf=jsc.getConf())
-        sc = SparkContext(conf=conf, gateway=gw, jsc=jsc)
+        # If there is already an active instance of Python SparkContext use 
it, or create a new one
+        if not SparkContext._active_spark_context:
+            jsc = jssc.sparkContext()
+            conf = SparkConf(_jconf=jsc.getConf())
+            SparkContext(conf=conf, gateway=gw, jsc=jsc)
+
+        sc = SparkContext._active_spark_context
+
         # update ctx in serializer
-        SparkContext._active_spark_context = sc
         cls._transformerSerializer.ctx = sc
         return StreamingContext(sc, None, jssc)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/b40059db/python/pyspark/streaming/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/streaming/tests.py 
b/python/pyspark/streaming/tests.py
index 214d5be..510a4f2 100644
--- a/python/pyspark/streaming/tests.py
+++ b/python/pyspark/streaming/tests.py
@@ -603,6 +603,10 @@ class CheckpointTests(unittest.TestCase):
     def tearDown(self):
         if self.ssc is not None:
             self.ssc.stop(True)
+        if self.sc is not None:
+            self.sc.stop()
+        if self.cpd is not None:
+            shutil.rmtree(self.cpd)
 
     def test_get_or_create_and_get_active_or_create(self):
         inputd = tempfile.mkdtemp()
@@ -622,8 +626,12 @@ class CheckpointTests(unittest.TestCase):
             self.setupCalled = True
             return ssc
 
-        cpd = tempfile.mkdtemp("test_streaming_cps")
-        self.ssc = StreamingContext.getOrCreate(cpd, setup)
+        # Verify that getOrCreate() calls setup() in absence of checkpoint 
files
+        self.cpd = tempfile.mkdtemp("test_streaming_cps")
+        self.setupCalled = False
+        self.ssc = StreamingContext.getOrCreate(self.cpd, setup)
+        self.assertFalse(self.setupCalled)
+
         self.ssc.start()
 
         def check_output(n):
@@ -660,31 +668,52 @@ class CheckpointTests(unittest.TestCase):
         self.ssc.stop(True, True)
         time.sleep(1)
         self.setupCalled = False
-        self.ssc = StreamingContext.getOrCreate(cpd, setup)
+        self.ssc = StreamingContext.getOrCreate(self.cpd, setup)
         self.assertFalse(self.setupCalled)
         self.ssc.start()
         check_output(3)
 
+        # Verify that getOrCreate() uses existing SparkContext
+        self.ssc.stop(True, True)
+        time.sleep(1)
+        sc = SparkContext(SparkConf())
+        self.setupCalled = False
+        self.ssc = StreamingContext.getOrCreate(self.cpd, setup)
+        self.assertFalse(self.setupCalled)
+        self.assertTrue(self.ssc.sparkContext == sc)
+
         # Verify the getActiveOrCreate() recovers from checkpoint files
         self.ssc.stop(True, True)
         time.sleep(1)
         self.setupCalled = False
-        self.ssc = StreamingContext.getActiveOrCreate(cpd, setup)
+        self.ssc = StreamingContext.getActiveOrCreate(self.cpd, setup)
         self.assertFalse(self.setupCalled)
         self.ssc.start()
         check_output(4)
 
         # Verify that getActiveOrCreate() returns active context
         self.setupCalled = False
-        self.assertEquals(StreamingContext.getActiveOrCreate(cpd, setup), 
self.ssc)
+        self.assertEquals(StreamingContext.getActiveOrCreate(self.cpd, setup), 
self.ssc)
         self.assertFalse(self.setupCalled)
 
+        # Verify that getActiveOrCreate() uses existing SparkContext
+        self.ssc.stop(True, True)
+        time.sleep(1)
+        self.sc = SparkContext(SparkConf())
+        self.setupCalled = False
+        self.ssc = StreamingContext.getActiveOrCreate(self.cpd, setup)
+        self.assertFalse(self.setupCalled)
+        self.assertTrue(self.ssc.sparkContext == sc)
+
         # Verify that getActiveOrCreate() calls setup() in absence of 
checkpoint files
         self.ssc.stop(True, True)
-        shutil.rmtree(cpd)  # delete checkpoint directory
+        shutil.rmtree(self.cpd)  # delete checkpoint directory
+        time.sleep(1)
         self.setupCalled = False
-        self.ssc = StreamingContext.getActiveOrCreate(cpd, setup)
+        self.ssc = StreamingContext.getActiveOrCreate(self.cpd, setup)
         self.assertTrue(self.setupCalled)
+
+        # Stop everything
         self.ssc.stop(True, True)
 
 

http://git-wip-us.apache.org/repos/asf/spark/blob/b40059db/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index 6f6b449..cd5d960 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -289,6 +289,15 @@ object CheckpointReader extends Logging {
   /**
    * Read checkpoint files present in the given checkpoint directory. If there 
are no checkpoint
    * files, then return None, else try to return the latest valid checkpoint 
object. If no
+   * checkpoint files could be read correctly, then return None.
+   */
+  def read(checkpointDir: String): Option[Checkpoint] = {
+    read(checkpointDir, new SparkConf(), SparkHadoopUtil.get.conf, 
ignoreReadError = true)
+  }
+
+  /**
+   * Read checkpoint files present in the given checkpoint directory. If there 
are no checkpoint
+   * files, then return None, else try to return the latest valid checkpoint 
object. If no
    * checkpoint files could be read correctly, then return None (if 
ignoreReadError = true),
    * or throw exception (if ignoreReadError = false).
    */


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to