Repository: spark
Updated Branches:
  refs/heads/branch-1.0 2693035ba -> e0bc72eb7


[SPARK-791] [PySpark] fix pickle itemgetter with cloudpickle

fix the problem with pickle operator.itemgetter with multiple index.

Author: Davies Liu <davies....@gmail.com>

Closes #1627 from davies/itemgetter and squashes the following commits:

aabd7fa [Davies Liu] fix pickle itemgetter with cloudpickle

(cherry picked from commit 92ef02626e793ea853cced4cbfee316f0b748ed7)
Signed-off-by: Josh Rosen <joshro...@apache.org>

Conflicts:
        python/pyspark/tests.py


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e0bc72eb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e0bc72eb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e0bc72eb

Branch: refs/heads/branch-1.0
Commit: e0bc72eb7238f5ea7dc075049dea6da9170c1998
Parents: 2693035
Author: Davies Liu <davies....@gmail.com>
Authored: Tue Jul 29 01:02:18 2014 -0700
Committer: Josh Rosen <joshro...@apache.org>
Committed: Tue Jul 29 01:04:32 2014 -0700

----------------------------------------------------------------------
 python/pyspark/cloudpickle.py | 5 +++--
 python/pyspark/tests.py       | 6 ++++++
 2 files changed, 9 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e0bc72eb/python/pyspark/cloudpickle.py
----------------------------------------------------------------------
diff --git a/python/pyspark/cloudpickle.py b/python/pyspark/cloudpickle.py
index eb5dbb8..ad2d35a 100644
--- a/python/pyspark/cloudpickle.py
+++ b/python/pyspark/cloudpickle.py
@@ -560,8 +560,9 @@ class CloudPickler(pickle.Pickler):
             ]
 
 
-        itemgetter_obj = ctypes.cast(ctypes.c_void_p(id(obj)), 
ctypes.POINTER(ItemGetterType)).contents
-        return self.save_reduce(operator.itemgetter, (itemgetter_obj.item,))
+        obj = ctypes.cast(ctypes.c_void_p(id(obj)), 
ctypes.POINTER(ItemGetterType)).contents
+        return self.save_reduce(operator.itemgetter,
+                obj.item if obj.nitems > 1 else (obj.item,))
 
     if PyObject_HEAD:
         dispatch[operator.itemgetter] = save_itemgetter

http://git-wip-us.apache.org/repos/asf/spark/blob/e0bc72eb/python/pyspark/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index 2ea0554..45284ee 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -198,6 +198,12 @@ class TestRDDFunctions(PySparkTestCase):
         os.unlink(tempFile.name)
         self.assertRaises(Exception, lambda: filtered_data.count())
 
+    def test_itemgetter(self):
+        rdd = self.sc.parallelize([range(10)])
+        from operator import itemgetter
+        self.assertEqual([1], rdd.map(itemgetter(1)).collect())
+        self.assertEqual([(2, 3)], rdd.map(itemgetter(2, 3)).collect())
+
 
 class TestIO(PySparkTestCase):
 

Reply via email to