Repository: spark Updated Branches: refs/heads/branch-2.1 b0a73c9be -> 406f33987
[SPARK-18361][PYSPARK] Expose RDD localCheckpoint in PySpark ## What changes were proposed in this pull request? Expose RDD's localCheckpoint() and associated functions in PySpark. ## How was this patch tested? I added a UnitTest in python/pyspark/tests.py which passes. I certify that this is my original work, and I license it to the project under the project's open source license. Gabriel HUANG Developer at Cardabel (http://cardabel.com/) Author: Gabriel Huang <gabi.xiaohu...@gmail.com> Closes #15811 from gabrielhuang/pyspark-localcheckpoint. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/406f3398 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/406f3398 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/406f3398 Branch: refs/heads/branch-2.1 Commit: 406f33987ac078fb20d2f5e81b7e1f646ea53fed Parents: b0a73c9 Author: Gabriel Huang <gabi.xiaohu...@gmail.com> Authored: Mon Nov 21 16:08:34 2016 -0500 Committer: Andrew Or <andrewo...@gmail.com> Committed: Mon Nov 21 16:16:59 2016 -0500 ---------------------------------------------------------------------- python/pyspark/rdd.py | 33 ++++++++++++++++++++++++++++++++- python/pyspark/tests.py | 17 +++++++++++++++++ 2 files changed, 49 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/406f3398/python/pyspark/rdd.py ---------------------------------------------------------------------- diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 641787e..f21a364 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -263,13 +263,44 @@ class RDD(object): def isCheckpointed(self): """ - Return whether this RDD has been checkpointed or not + Return whether this RDD is checkpointed and materialized, either reliably or locally. """ return self._jrdd.rdd().isCheckpointed() + def localCheckpoint(self): + """ + Mark this RDD for local checkpointing using Spark's existing caching layer. + + This method is for users who wish to truncate RDD lineages while skipping the expensive + step of replicating the materialized data in a reliable distributed file system. This is + useful for RDDs with long lineages that need to be truncated periodically (e.g. GraphX). + + Local checkpointing sacrifices fault-tolerance for performance. In particular, checkpointed + data is written to ephemeral local storage in the executors instead of to a reliable, + fault-tolerant storage. The effect is that if an executor fails during the computation, + the checkpointed data may no longer be accessible, causing an irrecoverable job failure. + + This is NOT safe to use with dynamic allocation, which removes executors along + with their cached blocks. If you must use both features, you are advised to set + L{spark.dynamicAllocation.cachedExecutorIdleTimeout} to a high value. + + The checkpoint directory set through L{SparkContext.setCheckpointDir()} is not used. + """ + self._jrdd.rdd().localCheckpoint() + + def isLocallyCheckpointed(self): + """ + Return whether this RDD is marked for local checkpointing. + + Exposed for testing. + """ + return self._jrdd.rdd().isLocallyCheckpointed() + def getCheckpointFile(self): """ Gets the name of the file to which this RDD was checkpointed + + Not defined if RDD is checkpointed locally. """ checkpointFile = self._jrdd.rdd().getCheckpointFile() if checkpointFile.isDefined(): http://git-wip-us.apache.org/repos/asf/spark/blob/406f3398/python/pyspark/tests.py ---------------------------------------------------------------------- diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index 3e0bd16..ab4bef8 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -390,6 +390,23 @@ class CheckpointTests(ReusedPySparkTestCase): self.assertEqual([1, 2, 3, 4], recovered.collect()) +class LocalCheckpointTests(ReusedPySparkTestCase): + + def test_basic_localcheckpointing(self): + parCollection = self.sc.parallelize([1, 2, 3, 4]) + flatMappedRDD = parCollection.flatMap(lambda x: range(1, x + 1)) + + self.assertFalse(flatMappedRDD.isCheckpointed()) + self.assertFalse(flatMappedRDD.isLocallyCheckpointed()) + + flatMappedRDD.localCheckpoint() + result = flatMappedRDD.collect() + time.sleep(1) # 1 second + self.assertTrue(flatMappedRDD.isCheckpointed()) + self.assertTrue(flatMappedRDD.isLocallyCheckpointed()) + self.assertEqual(flatMappedRDD.collect(), result) + + class AddFileTests(PySparkTestCase): def test_add_py_file(self): --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org