Repository: spark Updated Branches: refs/heads/branch-1.0 2693035ba -> e0bc72eb7
[SPARK-791] [PySpark] fix pickle itemgetter with cloudpickle fix the problem with pickle operator.itemgetter with multiple index. Author: Davies Liu <davies....@gmail.com> Closes #1627 from davies/itemgetter and squashes the following commits: aabd7fa [Davies Liu] fix pickle itemgetter with cloudpickle (cherry picked from commit 92ef02626e793ea853cced4cbfee316f0b748ed7) Signed-off-by: Josh Rosen <joshro...@apache.org> Conflicts: python/pyspark/tests.py Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e0bc72eb Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e0bc72eb Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e0bc72eb Branch: refs/heads/branch-1.0 Commit: e0bc72eb7238f5ea7dc075049dea6da9170c1998 Parents: 2693035 Author: Davies Liu <davies....@gmail.com> Authored: Tue Jul 29 01:02:18 2014 -0700 Committer: Josh Rosen <joshro...@apache.org> Committed: Tue Jul 29 01:04:32 2014 -0700 ---------------------------------------------------------------------- python/pyspark/cloudpickle.py | 5 +++-- python/pyspark/tests.py | 6 ++++++ 2 files changed, 9 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/e0bc72eb/python/pyspark/cloudpickle.py ---------------------------------------------------------------------- diff --git a/python/pyspark/cloudpickle.py b/python/pyspark/cloudpickle.py index eb5dbb8..ad2d35a 100644 --- a/python/pyspark/cloudpickle.py +++ b/python/pyspark/cloudpickle.py @@ -560,8 +560,9 @@ class CloudPickler(pickle.Pickler): ] - itemgetter_obj = ctypes.cast(ctypes.c_void_p(id(obj)), ctypes.POINTER(ItemGetterType)).contents - return self.save_reduce(operator.itemgetter, (itemgetter_obj.item,)) + obj = ctypes.cast(ctypes.c_void_p(id(obj)), ctypes.POINTER(ItemGetterType)).contents + return self.save_reduce(operator.itemgetter, + obj.item if obj.nitems > 1 else (obj.item,)) if PyObject_HEAD: dispatch[operator.itemgetter] = save_itemgetter http://git-wip-us.apache.org/repos/asf/spark/blob/e0bc72eb/python/pyspark/tests.py ---------------------------------------------------------------------- diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index 2ea0554..45284ee 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -198,6 +198,12 @@ class TestRDDFunctions(PySparkTestCase): os.unlink(tempFile.name) self.assertRaises(Exception, lambda: filtered_data.count()) + def test_itemgetter(self): + rdd = self.sc.parallelize([range(10)]) + from operator import itemgetter + self.assertEqual([1], rdd.map(itemgetter(1)).collect()) + self.assertEqual([(2, 3)], rdd.map(itemgetter(2, 3)).collect()) + class TestIO(PySparkTestCase):