Davies Liu created SPARK-10542:
----------------------------------

             Summary: The  PySpark 1.5 closure serializer can't serialize a 
namedtuple instance.
                 Key: SPARK-10542
                 URL: https://issues.apache.org/jira/browse/SPARK-10542
             Project: Spark
          Issue Type: Bug
          Components: PySpark
    Affects Versions: 1.5.0
            Reporter: Davies Liu
            Assignee: Davies Liu
            Priority: Critical


Code to Reproduce Bug:
{code}
from collections import namedtuple
PowerPlantRow=namedtuple("PowerPlantRow", ["AT", "V", "AP", "RH", "PE"])
rdd=sc.parallelize([1]).map(lambda x: PowerPlantRow(1.0, 2.0, 3.0, 4.0, 5.0))
rdd.count()
{code}

Error message on Spark 1.5:
{code}
AttributeError: 'builtin_function_or_method' object has no attribute '__code__'
---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-5-59448e31019f> in <module>()
      2 PowerPlantRow=namedtuple("PowerPlantRow", ["AT", "V", "AP", "RH", "PE"])
      3 rdd=sc.parallelize([1]).map(lambda x: PowerPlantRow(1.0, 2.0, 3.0, 4.0, 
5.0))
----> 4 rdd.count()

/home/ubuntu/databricks/spark/python/pyspark/rdd.pyc in count(self)
   1004         3
   1005         """
-> 1006         return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
   1007 
   1008     def stats(self):

/home/ubuntu/databricks/spark/python/pyspark/rdd.pyc in sum(self)
    995         6.0
    996         """
--> 997         return self.mapPartitions(lambda x: [sum(x)]).fold(0, 
operator.add)
    998 
    999     def count(self):

/home/ubuntu/databricks/spark/python/pyspark/rdd.pyc in fold(self, zeroValue, 
op)
    869         # zeroValue provided to each partition is unique from the one 
provided
    870         # to the final reduce call
--> 871         vals = self.mapPartitions(func).collect()
    872         return reduce(op, vals, zeroValue)
    873 

/home/ubuntu/databricks/spark/python/pyspark/rdd.pyc in collect(self)
    771         """
    772         with SCCallSiteSync(self.context) as css:
--> 773             port = 
self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
    774         return list(_load_from_socket(port, self._jrdd_deserializer))
    775 

/home/ubuntu/databricks/spark/python/pyspark/rdd.pyc in _jrdd(self)
   2383         command = (self.func, profiler, self._prev_jrdd_deserializer,
   2384                    self._jrdd_deserializer)
-> 2385         pickled_cmd, bvars, env, includes = 
_prepare_for_python_RDD(self.ctx, command, self)
   2386         python_rdd = self.ctx._jvm.PythonRDD(self._prev_jrdd.rdd(),
   2387                                              bytearray(pickled_cmd),

/home/ubuntu/databricks/spark/python/pyspark/rdd.pyc in 
_prepare_for_python_RDD(sc, command, obj)
   2303     # the serialized command will be compressed by broadcast
   2304     ser = CloudPickleSerializer()
-> 2305     pickled_command = ser.dumps(command)
   2306     if len(pickled_command) > (1 << 20):  # 1M
   2307         # The broadcast will have same life cycle as created PythonRDD

/home/ubuntu/databricks/spark/python/pyspark/serializers.pyc in dumps(self, obj)
    425 
    426     def dumps(self, obj):
--> 427         return cloudpickle.dumps(obj, 2)
    428 
    429 

/home/ubuntu/databricks/spark/python/pyspark/cloudpickle.pyc in dumps(obj, 
protocol)
    639 
    640     cp = CloudPickler(file,protocol)
--> 641     cp.dump(obj)
    642 
    643     return file.getvalue()

/home/ubuntu/databricks/spark/python/pyspark/cloudpickle.pyc in dump(self, obj)
    105         self.inject_addons()
    106         try:
--> 107             return Pickler.dump(self, obj)
    108         except RuntimeError as e:
    109             if 'recursion' in e.args[0]:

/usr/lib/python2.7/pickle.pyc in dump(self, obj)
    222         if self.proto >= 2:
    223             self.write(PROTO + chr(self.proto))
--> 224         self.save(obj)
    225         self.write(STOP)
    226 

/usr/lib/python2.7/pickle.pyc in save(self, obj)
    284         f = self.dispatch.get(t)
    285         if f:
--> 286             f(self, obj) # Call unbound method with explicit self
    287             return
    288 

/usr/lib/python2.7/pickle.pyc in save_tuple(self, obj)
    560         write(MARK)
    561         for element in obj:
--> 562             save(element)
    563 
    564         if id(obj) in memo:

/usr/lib/python2.7/pickle.pyc in save(self, obj)
    284         f = self.dispatch.get(t)
    285         if f:
--> 286             f(self, obj) # Call unbound method with explicit self
    287             return
    288 
... skipped 23125 bytes ...
    650 
    651     dispatch[DictionaryType] = save_dict

/usr/lib/python2.7/pickle.pyc in _batch_setitems(self, items)
    684                 k, v = tmp[0]
    685                 save(k)
--> 686                 save(v)
    687                 write(SETITEM)
    688             # else tmp is empty, and we're done

/usr/lib/python2.7/pickle.pyc in save(self, obj)
    284         f = self.dispatch.get(t)
    285         if f:
--> 286             f(self, obj) # Call unbound method with explicit self
    287             return
    288 

/home/ubuntu/databricks/spark/python/pyspark/cloudpickle.pyc in 
save_global(self, obj, name, pack)
    367                     v = v.__func__
    368                 dd[k] = v
--> 369             self.save(dd)
    370             self.write(pickle.TUPLE2)
    371             self.write(pickle.REDUCE)

/usr/lib/python2.7/pickle.pyc in save(self, obj)
    284         f = self.dispatch.get(t)
    285         if f:
--> 286             f(self, obj) # Call unbound method with explicit self
    287             return
    288 

/usr/lib/python2.7/pickle.pyc in save_dict(self, obj)
    647 
    648         self.memoize(obj)
--> 649         self._batch_setitems(obj.iteritems())
    650 
    651     dispatch[DictionaryType] = save_dict

/usr/lib/python2.7/pickle.pyc in _batch_setitems(self, items)
    679                 for k, v in tmp:
    680                     save(k)
--> 681                     save(v)
    682                 write(SETITEMS)
    683             elif n:

/usr/lib/python2.7/pickle.pyc in save(self, obj)
    284         f = self.dispatch.get(t)
    285         if f:
--> 286             f(self, obj) # Call unbound method with explicit self
    287             return
    288 

/home/ubuntu/databricks/spark/python/pyspark/cloudpickle.pyc in 
save_function(self, obj, name)
    191         if islambda(obj) or obj.__code__.co_filename == '<stdin>' or 
themodule is None:
    192             #print("save global", islambda(obj), 
obj.__code__.co_filename, modname, themodule)
--> 193             self.save_function_tuple(obj)
    194             return
    195         else:

/home/ubuntu/databricks/spark/python/pyspark/cloudpickle.pyc in 
save_function_tuple(self, func)
    240         # save the rest of the func data needed by _fill_function
    241         save(f_globals)
--> 242         save(defaults)
    243         save(dct)
    244         write(pickle.TUPLE)

/usr/lib/python2.7/pickle.pyc in save(self, obj)
    284         f = self.dispatch.get(t)
    285         if f:
--> 286             f(self, obj) # Call unbound method with explicit self
    287             return
    288 

/usr/lib/python2.7/pickle.pyc in save_tuple(self, obj)
    546         if n <= 3 and proto >= 2:
    547             for element in obj:
--> 548                 save(element)
    549             # Subtle.  Same as in the big comment below.
    550             if id(obj) in memo:

/usr/lib/python2.7/pickle.pyc in save(self, obj)
    284         f = self.dispatch.get(t)
    285         if f:
--> 286             f(self, obj) # Call unbound method with explicit self
    287             return
    288 

/home/ubuntu/databricks/spark/python/pyspark/cloudpickle.pyc in 
save_builtin_function(self, obj)
    313         if obj.__module__ is "__builtin__":
    314             return self.save_global(obj)
--> 315         return self.save_function(obj)
    316     dispatch[types.BuiltinFunctionType] = save_builtin_function
    317 

/home/ubuntu/databricks/spark/python/pyspark/cloudpickle.pyc in 
save_function(self, obj, name)
    189         # we'll pickle the actual function object rather than simply 
saving a
    190         # reference (as is done in default pickler), via 
save_function_tuple.
--> 191         if islambda(obj) or obj.__code__.co_filename == '<stdin>' or 
themodule is None:
    192             #print("save global", islambda(obj), 
obj.__code__.co_filename, modname, themodule)
    193             self.save_function_tuple(obj)

AttributeError: 'builtin_function_or_method' object has no attribute '__code__'
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to