Repository: spark Updated Branches: refs/heads/branch-1.5 5db51f911 -> d5c0361e7
[SPARK-10542] [PYSPARK] fix serialize namedtuple Author: Davies Liu <dav...@databricks.com> Closes #8707 from davies/fix_namedtuple. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d5c0361e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d5c0361e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d5c0361e Branch: refs/heads/branch-1.5 Commit: d5c0361e7f2535a3893a7172d21881b18aa919d6 Parents: 5db51f9 Author: Davies Liu <dav...@databricks.com> Authored: Mon Sep 14 19:46:34 2015 -0700 Committer: Davies Liu <davies....@gmail.com> Committed: Mon Sep 14 19:49:14 2015 -0700 ---------------------------------------------------------------------- python/pyspark/cloudpickle.py | 15 ++++++++++++++- python/pyspark/serializers.py | 1 + python/pyspark/tests.py | 5 +++++ 3 files changed, 20 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/d5c0361e/python/pyspark/cloudpickle.py ---------------------------------------------------------------------- diff --git a/python/pyspark/cloudpickle.py b/python/pyspark/cloudpickle.py index 3b64798..95b3abc 100644 --- a/python/pyspark/cloudpickle.py +++ b/python/pyspark/cloudpickle.py @@ -350,6 +350,11 @@ class CloudPickler(Pickler): if new_override: d['__new__'] = obj.__new__ + # workaround for namedtuple (hijacked by PySpark) + if getattr(obj, '_is_namedtuple_', False): + self.save_reduce(_load_namedtuple, (obj.__name__, obj._fields)) + return + self.save(_load_class) self.save_reduce(typ, (obj.__name__, obj.__bases__, {"__doc__": obj.__doc__}), obj=obj) d.pop('__doc__', None) @@ -382,7 +387,7 @@ class CloudPickler(Pickler): self.save_reduce(types.MethodType, (obj.__func__, obj.__self__), obj=obj) else: self.save_reduce(types.MethodType, (obj.__func__, obj.__self__, obj.__self__.__class__), - obj=obj) + obj=obj) dispatch[types.MethodType] = save_instancemethod def save_inst(self, obj): @@ -744,6 +749,14 @@ def _load_class(cls, d): return cls +def _load_namedtuple(name, fields): + """ + Loads a class generated by namedtuple + """ + from collections import namedtuple + return namedtuple(name, fields) + + """Constructors for 3rd party libraries Note: These can never be renamed due to client compatibility issues""" http://git-wip-us.apache.org/repos/asf/spark/blob/d5c0361e/python/pyspark/serializers.py ---------------------------------------------------------------------- diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py index 411b4db..2a13269 100644 --- a/python/pyspark/serializers.py +++ b/python/pyspark/serializers.py @@ -359,6 +359,7 @@ def _hack_namedtuple(cls): def __reduce__(self): return (_restore, (name, fields, tuple(self))) cls.__reduce__ = __reduce__ + cls._is_namedtuple_ = True return cls http://git-wip-us.apache.org/repos/asf/spark/blob/d5c0361e/python/pyspark/tests.py ---------------------------------------------------------------------- diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index 8bfed07..647504c 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -218,6 +218,11 @@ class SerializationTestCase(unittest.TestCase): p2 = loads(dumps(p1, 2)) self.assertEqual(p1, p2) + from pyspark.cloudpickle import dumps + P2 = loads(dumps(P)) + p3 = P2(1, 3) + self.assertEqual(p1, p3) + def test_itemgetter(self): from operator import itemgetter ser = CloudPickleSerializer() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org