[ https://issues.apache.org/jira/browse/SPARK-21439?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Maciej Bryński updated SPARK-21439: ----------------------------------- Component/s: PySpark > Cannot use Spark with Python ABCmeta (exception from cloudpickle) > ----------------------------------------------------------------- > > Key: SPARK-21439 > URL: https://issues.apache.org/jira/browse/SPARK-21439 > Project: Spark > Issue Type: Bug > Components: PySpark, Spark Core > Affects Versions: 2.1.1 > Reporter: Maciej Bryński > > I'm trying to use code with ABCMeta. > This code gives exception as a result. > {code} > from abc import ABCMeta, abstractmethod > class A(metaclass=ABCMeta): > @abstractmethod > def x(self): > """Abstract""" > > class B(A): > def x(self): > return 10 > b = B() > sc.range(10).map(lambda x: b.x()).collect() > {code} > Exception: > {code} > --------------------------------------------------------------------------- > AttributeError Traceback (most recent call last) > /opt/spark/python/pyspark/cloudpickle.py in dump(self, obj) > 146 try: > --> 147 return Pickler.dump(self, obj) > 148 except RuntimeError as e: > /usr/lib/python3.4/pickle.py in dump(self, obj) > 409 self.framer.start_framing() > --> 410 self.save(obj) > 411 self.write(STOP) > /usr/lib/python3.4/pickle.py in save(self, obj, save_persistent_id) > 476 if f is not None: > --> 477 f(self, obj) # Call unbound method with explicit self > 478 return > /usr/lib/python3.4/pickle.py in save_tuple(self, obj) > 741 for element in obj: > --> 742 save(element) > 743 > /usr/lib/python3.4/pickle.py in save(self, obj, save_persistent_id) > 476 if f is not None: > --> 477 f(self, obj) # Call unbound method with explicit self > 478 return > /opt/spark/python/pyspark/cloudpickle.py in save_function(self, obj, name) > 253 if klass is None or klass is not obj: > --> 254 self.save_function_tuple(obj) > 255 return > /opt/spark/python/pyspark/cloudpickle.py in save_function_tuple(self, func) > 290 save(_make_skel_func) > --> 291 save((code, closure, base_globals)) > 292 write(pickle.REDUCE) > /usr/lib/python3.4/pickle.py in save(self, obj, save_persistent_id) > 476 if f is not None: > --> 477 f(self, obj) # Call unbound method with explicit self > 478 return > /usr/lib/python3.4/pickle.py in save_tuple(self, obj) > 726 for element in obj: > --> 727 save(element) > 728 # Subtle. Same as in the big comment below. > /usr/lib/python3.4/pickle.py in save(self, obj, save_persistent_id) > 476 if f is not None: > --> 477 f(self, obj) # Call unbound method with explicit self > 478 return > /usr/lib/python3.4/pickle.py in save_list(self, obj) > 771 self.memoize(obj) > --> 772 self._batch_appends(obj) > 773 > /usr/lib/python3.4/pickle.py in _batch_appends(self, items) > 795 for x in tmp: > --> 796 save(x) > 797 write(APPENDS) > /usr/lib/python3.4/pickle.py in save(self, obj, save_persistent_id) > 476 if f is not None: > --> 477 f(self, obj) # Call unbound method with explicit self > 478 return > /opt/spark/python/pyspark/cloudpickle.py in save_function(self, obj, name) > 253 if klass is None or klass is not obj: > --> 254 self.save_function_tuple(obj) > 255 return > /opt/spark/python/pyspark/cloudpickle.py in save_function_tuple(self, func) > 290 save(_make_skel_func) > --> 291 save((code, closure, base_globals)) > 292 write(pickle.REDUCE) > /usr/lib/python3.4/pickle.py in save(self, obj, save_persistent_id) > 476 if f is not None: > --> 477 f(self, obj) # Call unbound method with explicit self > 478 return > /usr/lib/python3.4/pickle.py in save_tuple(self, obj) > 726 for element in obj: > --> 727 save(element) > 728 # Subtle. Same as in the big comment below. > /usr/lib/python3.4/pickle.py in save(self, obj, save_persistent_id) > 476 if f is not None: > --> 477 f(self, obj) # Call unbound method with explicit self > 478 return > /usr/lib/python3.4/pickle.py in save_list(self, obj) > 771 self.memoize(obj) > --> 772 self._batch_appends(obj) > 773 > /usr/lib/python3.4/pickle.py in _batch_appends(self, items) > 798 elif n: > --> 799 save(tmp[0]) > 800 write(APPEND) > /usr/lib/python3.4/pickle.py in save(self, obj, save_persistent_id) > 476 if f is not None: > --> 477 f(self, obj) # Call unbound method with explicit self > 478 return > /opt/spark/python/pyspark/cloudpickle.py in save_function(self, obj, name) > 247 #print("save global", islambda(obj), > obj.__code__.co_filename, modname, themodule) > --> 248 self.save_function_tuple(obj) > 249 return > /opt/spark/python/pyspark/cloudpickle.py in save_function_tuple(self, func) > 295 # save the rest of the func data needed by _fill_function > --> 296 save(f_globals) > 297 save(defaults) > /usr/lib/python3.4/pickle.py in save(self, obj, save_persistent_id) > 476 if f is not None: > --> 477 f(self, obj) # Call unbound method with explicit self > 478 return > /usr/lib/python3.4/pickle.py in save_dict(self, obj) > 811 self.memoize(obj) > --> 812 self._batch_setitems(obj.items()) > 813 > /usr/lib/python3.4/pickle.py in _batch_setitems(self, items) > 842 save(k) > --> 843 save(v) > 844 write(SETITEM) > /usr/lib/python3.4/pickle.py in save(self, obj, save_persistent_id) > 521 # Save the reduce() output and finally memoize the object > --> 522 self.save_reduce(obj=obj, *rv) > 523 > /opt/spark/python/pyspark/cloudpickle.py in save_reduce(self, func, args, > state, listitems, dictitems, obj) > 565 args = args[1:] > --> 566 save(cls) > 567 > /usr/lib/python3.4/pickle.py in save(self, obj, save_persistent_id) > 490 if issc: > --> 491 self.save_global(obj) > 492 return > /opt/spark/python/pyspark/cloudpickle.py in save_global(self, obj, name, pack) > 415 self.save(_load_class) > --> 416 self.save_reduce(typ, (obj.__name__, obj.__bases__, > {"__doc__": obj.__doc__}), obj=obj) > 417 d.pop('__doc__', None) > /opt/spark/python/pyspark/cloudpickle.py in save_reduce(self, func, args, > state, listitems, dictitems, obj) > 580 save(func) > --> 581 save(args) > 582 write(pickle.REDUCE) > /usr/lib/python3.4/pickle.py in save(self, obj, save_persistent_id) > 476 if f is not None: > --> 477 f(self, obj) # Call unbound method with explicit self > 478 return > /usr/lib/python3.4/pickle.py in save_tuple(self, obj) > 726 for element in obj: > --> 727 save(element) > 728 # Subtle. Same as in the big comment below. > /usr/lib/python3.4/pickle.py in save(self, obj, save_persistent_id) > 476 if f is not None: > --> 477 f(self, obj) # Call unbound method with explicit self > 478 return > /usr/lib/python3.4/pickle.py in save_tuple(self, obj) > 726 for element in obj: > --> 727 save(element) > 728 # Subtle. Same as in the big comment below. > /usr/lib/python3.4/pickle.py in save(self, obj, save_persistent_id) > 490 if issc: > --> 491 self.save_global(obj) > 492 return > /opt/spark/python/pyspark/cloudpickle.py in save_global(self, obj, name, pack) > 430 dd[k] = v > --> 431 self.save(dd) > 432 self.write(pickle.TUPLE2) > /usr/lib/python3.4/pickle.py in save(self, obj, save_persistent_id) > 476 if f is not None: > --> 477 f(self, obj) # Call unbound method with explicit self > 478 return > /usr/lib/python3.4/pickle.py in save_dict(self, obj) > 811 self.memoize(obj) > --> 812 self._batch_setitems(obj.items()) > 813 > /usr/lib/python3.4/pickle.py in _batch_setitems(self, items) > 837 save(k) > --> 838 save(v) > 839 write(SETITEMS) > /usr/lib/python3.4/pickle.py in save(self, obj, save_persistent_id) > 521 # Save the reduce() output and finally memoize the object > --> 522 self.save_reduce(obj=obj, *rv) > 523 > /opt/spark/python/pyspark/cloudpickle.py in save_reduce(self, func, args, > state, listitems, dictitems, obj) > 598 if state is not None: > --> 599 save(state) > 600 write(pickle.BUILD) > /usr/lib/python3.4/pickle.py in save(self, obj, save_persistent_id) > 476 if f is not None: > --> 477 f(self, obj) # Call unbound method with explicit self > 478 return > /usr/lib/python3.4/pickle.py in save_dict(self, obj) > 811 self.memoize(obj) > --> 812 self._batch_setitems(obj.items()) > 813 > /usr/lib/python3.4/pickle.py in _batch_setitems(self, items) > 837 save(k) > --> 838 save(v) > 839 write(SETITEMS) > /usr/lib/python3.4/pickle.py in save(self, obj, save_persistent_id) > 476 if f is not None: > --> 477 f(self, obj) # Call unbound method with explicit self > 478 return > /opt/spark/python/pyspark/cloudpickle.py in save_function(self, obj, name) > 253 if klass is None or klass is not obj: > --> 254 self.save_function_tuple(obj) > 255 return > /opt/spark/python/pyspark/cloudpickle.py in save_function_tuple(self, func) > 296 save(f_globals) > --> 297 save(defaults) > 298 save(dct) > /usr/lib/python3.4/pickle.py in save(self, obj, save_persistent_id) > 476 if f is not None: > --> 477 f(self, obj) # Call unbound method with explicit self > 478 return > /usr/lib/python3.4/pickle.py in save_tuple(self, obj) > 726 for element in obj: > --> 727 save(element) > 728 # Subtle. Same as in the big comment below. > /usr/lib/python3.4/pickle.py in save(self, obj, save_persistent_id) > 521 # Save the reduce() output and finally memoize the object > --> 522 self.save_reduce(obj=obj, *rv) > 523 > /opt/spark/python/pyspark/cloudpickle.py in save_reduce(self, func, args, > state, listitems, dictitems, obj) > 565 args = args[1:] > --> 566 save(cls) > 567 > /usr/lib/python3.4/pickle.py in save(self, obj, save_persistent_id) > 476 if f is not None: > --> 477 f(self, obj) # Call unbound method with explicit self > 478 return > /opt/spark/python/pyspark/cloudpickle.py in save_global(self, obj, name, pack) > 430 dd[k] = v > --> 431 self.save(dd) > 432 self.write(pickle.TUPLE2) > /usr/lib/python3.4/pickle.py in save(self, obj, save_persistent_id) > 476 if f is not None: > --> 477 f(self, obj) # Call unbound method with explicit self > 478 return > /usr/lib/python3.4/pickle.py in save_dict(self, obj) > 811 self.memoize(obj) > --> 812 self._batch_setitems(obj.items()) > 813 > /usr/lib/python3.4/pickle.py in _batch_setitems(self, items) > 837 save(k) > --> 838 save(v) > 839 write(SETITEMS) > /usr/lib/python3.4/pickle.py in save(self, obj, save_persistent_id) > 476 if f is not None: > --> 477 f(self, obj) # Call unbound method with explicit self > 478 return > /opt/spark/python/pyspark/cloudpickle.py in save_builtin_function(self, obj) > 366 return self.save_global(obj) > --> 367 return self.save_function(obj) > 368 dispatch[types.BuiltinFunctionType] = save_builtin_function > /opt/spark/python/pyspark/cloudpickle.py in save_function(self, obj, name) > 245 # reference (as is done in default pickler), via > save_function_tuple. > --> 246 if islambda(obj) or obj.__code__.co_filename == '<stdin>' or > themodule is None: > 247 #print("save global", islambda(obj), > obj.__code__.co_filename, modname, themodule) > AttributeError: 'builtin_function_or_method' object has no attribute > '__code__' > During handling of the above exception, another exception occurred: > AttributeError Traceback (most recent call last) > <ipython-input-8-9ea6e84ab4cc> in <module>() > ----> 1 sc.range(10).map(lambda x: b.x()).collect() > /opt/spark/python/pyspark/rdd.py in collect(self) > 806 """ > 807 with SCCallSiteSync(self.context) as css: > --> 808 port = > self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd()) > 809 return list(_load_from_socket(port, self._jrdd_deserializer)) > 810 > /opt/spark/python/pyspark/rdd.py in _jrdd(self) > 2438 > 2439 wrapped_func = _wrap_function(self.ctx, self.func, > self._prev_jrdd_deserializer, > -> 2440 self._jrdd_deserializer, > profiler) > 2441 python_rdd = self.ctx._jvm.PythonRDD(self._prev_jrdd.rdd(), > wrapped_func, > 2442 > self.preservesPartitioning) > /opt/spark/python/pyspark/rdd.py in _wrap_function(sc, func, deserializer, > serializer, profiler) > 2371 assert serializer, "serializer should not be empty" > 2372 command = (func, profiler, deserializer, serializer) > -> 2373 pickled_command, broadcast_vars, env, includes = > _prepare_for_python_RDD(sc, command) > 2374 return sc._jvm.PythonFunction(bytearray(pickled_command), env, > includes, sc.pythonExec, > 2375 sc.pythonVer, broadcast_vars, > sc._javaAccumulator) > /opt/spark/python/pyspark/rdd.py in _prepare_for_python_RDD(sc, command) > 2357 # the serialized command will be compressed by broadcast > 2358 ser = CloudPickleSerializer() > -> 2359 pickled_command = ser.dumps(command) > 2360 if len(pickled_command) > (1 << 20): # 1M > 2361 # The broadcast will have same life cycle as created PythonRDD > /opt/spark/python/pyspark/serializers.py in dumps(self, obj) > 458 > 459 def dumps(self, obj): > --> 460 return cloudpickle.dumps(obj, 2) > 461 > 462 > /opt/spark/python/pyspark/cloudpickle.py in dumps(obj, protocol) > 701 > 702 cp = CloudPickler(file,protocol) > --> 703 cp.dump(obj) > 704 > 705 return file.getvalue() > /opt/spark/python/pyspark/cloudpickle.py in dump(self, obj) > 153 raise > 154 except Exception as e: > --> 155 if "'i' format requires" in e.message: > 156 msg = "Object too large to serialize: " + e.message > 157 else: > AttributeError: 'AttributeError' object has no attribute 'message' > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org