[ https://issues.apache.org/jira/browse/SPARK-10542?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Apache Spark reassigned SPARK-10542: ------------------------------------ Assignee: Davies Liu (was: Apache Spark) > The PySpark 1.5 closure serializer can't serialize a namedtuple instance. > -------------------------------------------------------------------------- > > Key: SPARK-10542 > URL: https://issues.apache.org/jira/browse/SPARK-10542 > Project: Spark > Issue Type: Bug > Components: PySpark > Affects Versions: 1.5.0 > Reporter: Davies Liu > Assignee: Davies Liu > Priority: Critical > > Code to Reproduce Bug: > {code} > from collections import namedtuple > PowerPlantRow=namedtuple("PowerPlantRow", ["AT", "V", "AP", "RH", "PE"]) > rdd=sc.parallelize([1]).map(lambda x: PowerPlantRow(1.0, 2.0, 3.0, 4.0, 5.0)) > rdd.count() > {code} > Error message on Spark 1.5: > {code} > AttributeError: 'builtin_function_or_method' object has no attribute > '__code__' > --------------------------------------------------------------------------- > AttributeError Traceback (most recent call last) > <ipython-input-5-59448e31019f> in <module>() > 2 PowerPlantRow=namedtuple("PowerPlantRow", ["AT", "V", "AP", "RH", > "PE"]) > 3 rdd=sc.parallelize([1]).map(lambda x: PowerPlantRow(1.0, 2.0, 3.0, > 4.0, 5.0)) > ----> 4 rdd.count() > /home/ubuntu/databricks/spark/python/pyspark/rdd.pyc in count(self) > 1004 3 > 1005 """ > -> 1006 return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum() > 1007 > 1008 def stats(self): > /home/ubuntu/databricks/spark/python/pyspark/rdd.pyc in sum(self) > 995 6.0 > 996 """ > --> 997 return self.mapPartitions(lambda x: [sum(x)]).fold(0, > operator.add) > 998 > 999 def count(self): > /home/ubuntu/databricks/spark/python/pyspark/rdd.pyc in fold(self, zeroValue, > op) > 869 # zeroValue provided to each partition is unique from the one > provided > 870 # to the final reduce call > --> 871 vals = self.mapPartitions(func).collect() > 872 return reduce(op, vals, zeroValue) > 873 > /home/ubuntu/databricks/spark/python/pyspark/rdd.pyc in collect(self) > 771 """ > 772 with SCCallSiteSync(self.context) as css: > --> 773 port = > self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd()) > 774 return list(_load_from_socket(port, self._jrdd_deserializer)) > 775 > /home/ubuntu/databricks/spark/python/pyspark/rdd.pyc in _jrdd(self) > 2383 command = (self.func, profiler, self._prev_jrdd_deserializer, > 2384 self._jrdd_deserializer) > -> 2385 pickled_cmd, bvars, env, includes = > _prepare_for_python_RDD(self.ctx, command, self) > 2386 python_rdd = self.ctx._jvm.PythonRDD(self._prev_jrdd.rdd(), > 2387 bytearray(pickled_cmd), > /home/ubuntu/databricks/spark/python/pyspark/rdd.pyc in > _prepare_for_python_RDD(sc, command, obj) > 2303 # the serialized command will be compressed by broadcast > 2304 ser = CloudPickleSerializer() > -> 2305 pickled_command = ser.dumps(command) > 2306 if len(pickled_command) > (1 << 20): # 1M > 2307 # The broadcast will have same life cycle as created PythonRDD > /home/ubuntu/databricks/spark/python/pyspark/serializers.pyc in dumps(self, > obj) > 425 > 426 def dumps(self, obj): > --> 427 return cloudpickle.dumps(obj, 2) > 428 > 429 > /home/ubuntu/databricks/spark/python/pyspark/cloudpickle.pyc in dumps(obj, > protocol) > 639 > 640 cp = CloudPickler(file,protocol) > --> 641 cp.dump(obj) > 642 > 643 return file.getvalue() > /home/ubuntu/databricks/spark/python/pyspark/cloudpickle.pyc in dump(self, > obj) > 105 self.inject_addons() > 106 try: > --> 107 return Pickler.dump(self, obj) > 108 except RuntimeError as e: > 109 if 'recursion' in e.args[0]: > /usr/lib/python2.7/pickle.pyc in dump(self, obj) > 222 if self.proto >= 2: > 223 self.write(PROTO + chr(self.proto)) > --> 224 self.save(obj) > 225 self.write(STOP) > 226 > /usr/lib/python2.7/pickle.pyc in save(self, obj) > 284 f = self.dispatch.get(t) > 285 if f: > --> 286 f(self, obj) # Call unbound method with explicit self > 287 return > 288 > /usr/lib/python2.7/pickle.pyc in save_tuple(self, obj) > 560 write(MARK) > 561 for element in obj: > --> 562 save(element) > 563 > 564 if id(obj) in memo: > /usr/lib/python2.7/pickle.pyc in save(self, obj) > 284 f = self.dispatch.get(t) > 285 if f: > --> 286 f(self, obj) # Call unbound method with explicit self > 287 return > 288 > ... skipped 23125 bytes ... > 650 > 651 dispatch[DictionaryType] = save_dict > /usr/lib/python2.7/pickle.pyc in _batch_setitems(self, items) > 684 k, v = tmp[0] > 685 save(k) > --> 686 save(v) > 687 write(SETITEM) > 688 # else tmp is empty, and we're done > /usr/lib/python2.7/pickle.pyc in save(self, obj) > 284 f = self.dispatch.get(t) > 285 if f: > --> 286 f(self, obj) # Call unbound method with explicit self > 287 return > 288 > /home/ubuntu/databricks/spark/python/pyspark/cloudpickle.pyc in > save_global(self, obj, name, pack) > 367 v = v.__func__ > 368 dd[k] = v > --> 369 self.save(dd) > 370 self.write(pickle.TUPLE2) > 371 self.write(pickle.REDUCE) > /usr/lib/python2.7/pickle.pyc in save(self, obj) > 284 f = self.dispatch.get(t) > 285 if f: > --> 286 f(self, obj) # Call unbound method with explicit self > 287 return > 288 > /usr/lib/python2.7/pickle.pyc in save_dict(self, obj) > 647 > 648 self.memoize(obj) > --> 649 self._batch_setitems(obj.iteritems()) > 650 > 651 dispatch[DictionaryType] = save_dict > /usr/lib/python2.7/pickle.pyc in _batch_setitems(self, items) > 679 for k, v in tmp: > 680 save(k) > --> 681 save(v) > 682 write(SETITEMS) > 683 elif n: > /usr/lib/python2.7/pickle.pyc in save(self, obj) > 284 f = self.dispatch.get(t) > 285 if f: > --> 286 f(self, obj) # Call unbound method with explicit self > 287 return > 288 > /home/ubuntu/databricks/spark/python/pyspark/cloudpickle.pyc in > save_function(self, obj, name) > 191 if islambda(obj) or obj.__code__.co_filename == '<stdin>' or > themodule is None: > 192 #print("save global", islambda(obj), > obj.__code__.co_filename, modname, themodule) > --> 193 self.save_function_tuple(obj) > 194 return > 195 else: > /home/ubuntu/databricks/spark/python/pyspark/cloudpickle.pyc in > save_function_tuple(self, func) > 240 # save the rest of the func data needed by _fill_function > 241 save(f_globals) > --> 242 save(defaults) > 243 save(dct) > 244 write(pickle.TUPLE) > /usr/lib/python2.7/pickle.pyc in save(self, obj) > 284 f = self.dispatch.get(t) > 285 if f: > --> 286 f(self, obj) # Call unbound method with explicit self > 287 return > 288 > /usr/lib/python2.7/pickle.pyc in save_tuple(self, obj) > 546 if n <= 3 and proto >= 2: > 547 for element in obj: > --> 548 save(element) > 549 # Subtle. Same as in the big comment below. > 550 if id(obj) in memo: > /usr/lib/python2.7/pickle.pyc in save(self, obj) > 284 f = self.dispatch.get(t) > 285 if f: > --> 286 f(self, obj) # Call unbound method with explicit self > 287 return > 288 > /home/ubuntu/databricks/spark/python/pyspark/cloudpickle.pyc in > save_builtin_function(self, obj) > 313 if obj.__module__ is "__builtin__": > 314 return self.save_global(obj) > --> 315 return self.save_function(obj) > 316 dispatch[types.BuiltinFunctionType] = save_builtin_function > 317 > /home/ubuntu/databricks/spark/python/pyspark/cloudpickle.pyc in > save_function(self, obj, name) > 189 # we'll pickle the actual function object rather than simply > saving a > 190 # reference (as is done in default pickler), via > save_function_tuple. > --> 191 if islambda(obj) or obj.__code__.co_filename == '<stdin>' or > themodule is None: > 192 #print("save global", islambda(obj), > obj.__code__.co_filename, modname, themodule) > 193 self.save_function_tuple(obj) > AttributeError: 'builtin_function_or_method' object has no attribute > '__code__' > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org