Repository: spark
Updated Branches:
  refs/heads/master 59236e5c5 -> 90ca18448


[SPARK-14418][PYSPARK] fix unpersist of Broadcast in Python

## What changes were proposed in this pull request?

Currently, Broaccast.unpersist() will remove the file of broadcast, which 
should be the behavior of destroy().

This PR added destroy() for Broadcast in Python, to match the sematics in Scala.

## How was this patch tested?

Added regression tests.

Author: Davies Liu <dav...@databricks.com>

Closes #12189 from davies/py_unpersist.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/90ca1844
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/90ca1844
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/90ca1844

Branch: refs/heads/master
Commit: 90ca1844865baf96656a9e5efdf56f415f2646be
Parents: 59236e5
Author: Davies Liu <dav...@databricks.com>
Authored: Wed Apr 6 10:46:34 2016 -0700
Committer: Yin Huai <yh...@databricks.com>
Committed: Wed Apr 6 10:46:34 2016 -0700

----------------------------------------------------------------------
 python/pyspark/broadcast.py | 17 ++++++++++++++++-
 python/pyspark/tests.py     | 15 +++++++++++++++
 2 files changed, 31 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/90ca1844/python/pyspark/broadcast.py
----------------------------------------------------------------------
diff --git a/python/pyspark/broadcast.py b/python/pyspark/broadcast.py
index 663c9ab..a0b8192 100644
--- a/python/pyspark/broadcast.py
+++ b/python/pyspark/broadcast.py
@@ -99,11 +99,26 @@ class Broadcast(object):
 
     def unpersist(self, blocking=False):
         """
-        Delete cached copies of this broadcast on the executors.
+        Delete cached copies of this broadcast on the executors. If the
+        broadcast is used after this is called, it will need to be
+        re-sent to each executor.
+
+        :param blocking: Whether to block until unpersisting has completed
         """
         if self._jbroadcast is None:
             raise Exception("Broadcast can only be unpersisted in driver")
         self._jbroadcast.unpersist(blocking)
+
+    def destroy(self):
+        """
+        Destroy all data and metadata related to this broadcast variable.
+        Use this with caution; once a broadcast variable has been destroyed,
+        it cannot be used again. This method blocks until destroy has
+        completed.
+        """
+        if self._jbroadcast is None:
+            raise Exception("Broadcast can only be destroyed in driver")
+        self._jbroadcast.destroy()
         os.unlink(self._path)
 
     def __reduce__(self):

http://git-wip-us.apache.org/repos/asf/spark/blob/90ca1844/python/pyspark/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index 40fccb8..15c87e2 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -694,6 +694,21 @@ class RDDTests(ReusedPySparkTestCase):
         m = self.sc.parallelize(range(1), 1).map(lambda x: 
len(bdata.value)).sum()
         self.assertEqual(N, m)
 
+    def test_unpersist(self):
+        N = 1000
+        data = [[float(i) for i in range(300)] for i in range(N)]
+        bdata = self.sc.broadcast(data)  # 3MB
+        bdata.unpersist()
+        m = self.sc.parallelize(range(1), 1).map(lambda x: 
len(bdata.value)).sum()
+        self.assertEqual(N, m)
+        bdata.destroy()
+        try:
+            self.sc.parallelize(range(1), 1).map(lambda x: 
len(bdata.value)).sum()
+        except Exception as e:
+            pass
+        else:
+            raise Exception("job should fail after destroy the broadcast")
+
     def test_multiple_broadcasts(self):
         N = 1 << 21
         b1 = self.sc.broadcast(set(range(N)))  # multiple blocks in JVM


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to