Repository: spark
Updated Branches:
  refs/heads/branch-1.5 5db51f911 -> d5c0361e7


[SPARK-10542] [PYSPARK] fix serialize namedtuple

Author: Davies Liu <dav...@databricks.com>

Closes #8707 from davies/fix_namedtuple.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d5c0361e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d5c0361e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d5c0361e

Branch: refs/heads/branch-1.5
Commit: d5c0361e7f2535a3893a7172d21881b18aa919d6
Parents: 5db51f9
Author: Davies Liu <dav...@databricks.com>
Authored: Mon Sep 14 19:46:34 2015 -0700
Committer: Davies Liu <davies....@gmail.com>
Committed: Mon Sep 14 19:49:14 2015 -0700

----------------------------------------------------------------------
 python/pyspark/cloudpickle.py | 15 ++++++++++++++-
 python/pyspark/serializers.py |  1 +
 python/pyspark/tests.py       |  5 +++++
 3 files changed, 20 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d5c0361e/python/pyspark/cloudpickle.py
----------------------------------------------------------------------
diff --git a/python/pyspark/cloudpickle.py b/python/pyspark/cloudpickle.py
index 3b64798..95b3abc 100644
--- a/python/pyspark/cloudpickle.py
+++ b/python/pyspark/cloudpickle.py
@@ -350,6 +350,11 @@ class CloudPickler(Pickler):
             if new_override:
                 d['__new__'] = obj.__new__
 
+            # workaround for namedtuple (hijacked by PySpark)
+            if getattr(obj, '_is_namedtuple_', False):
+                self.save_reduce(_load_namedtuple, (obj.__name__, obj._fields))
+                return
+
             self.save(_load_class)
             self.save_reduce(typ, (obj.__name__, obj.__bases__, {"__doc__": 
obj.__doc__}), obj=obj)
             d.pop('__doc__', None)
@@ -382,7 +387,7 @@ class CloudPickler(Pickler):
             self.save_reduce(types.MethodType, (obj.__func__, obj.__self__), 
obj=obj)
         else:
             self.save_reduce(types.MethodType, (obj.__func__, obj.__self__, 
obj.__self__.__class__),
-                         obj=obj)
+                             obj=obj)
     dispatch[types.MethodType] = save_instancemethod
 
     def save_inst(self, obj):
@@ -744,6 +749,14 @@ def _load_class(cls, d):
     return cls
 
 
+def _load_namedtuple(name, fields):
+    """
+    Loads a class generated by namedtuple
+    """
+    from collections import namedtuple
+    return namedtuple(name, fields)
+
+
 """Constructors for 3rd party libraries
 Note: These can never be renamed due to client compatibility issues"""
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d5c0361e/python/pyspark/serializers.py
----------------------------------------------------------------------
diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py
index 411b4db..2a13269 100644
--- a/python/pyspark/serializers.py
+++ b/python/pyspark/serializers.py
@@ -359,6 +359,7 @@ def _hack_namedtuple(cls):
     def __reduce__(self):
         return (_restore, (name, fields, tuple(self)))
     cls.__reduce__ = __reduce__
+    cls._is_namedtuple_ = True
     return cls
 
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d5c0361e/python/pyspark/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index 8bfed07..647504c 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -218,6 +218,11 @@ class SerializationTestCase(unittest.TestCase):
         p2 = loads(dumps(p1, 2))
         self.assertEqual(p1, p2)
 
+        from pyspark.cloudpickle import dumps
+        P2 = loads(dumps(P))
+        p3 = P2(1, 3)
+        self.assertEqual(p1, p3)
+
     def test_itemgetter(self):
         from operator import itemgetter
         ser = CloudPickleSerializer()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to