[ https://issues.apache.org/jira/browse/SPARK-17472?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Davies Liu resolved SPARK-17472. -------------------------------- Resolution: Fixed Fix Version/s: 2.1.0 Issue resolved by pull request 15026 [https://github.com/apache/spark/pull/15026] > Better error message for serialization failures of large objects in Python > -------------------------------------------------------------------------- > > Key: SPARK-17472 > URL: https://issues.apache.org/jira/browse/SPARK-17472 > Project: Spark > Issue Type: Improvement > Components: PySpark > Reporter: Eric Liang > Priority: Minor > Fix For: 2.1.0 > > > {code} > def run(): > import numpy.random as nr > b = nr.bytes(8 * 1000000000) > sc.parallelize(range(1000), 1000).map(lambda x: len(b)).count() > run() > {code} > Gives you the following error from pickle > {code} > error: 'i' format requires -2147483648 <= number <= 2147483647 > --------------------------------------------------------------------------- > error Traceback (most recent call last) > <ipython-input-14-ba73d84faba7> in <module>() > 4 sc.parallelize(range(1000), 1000).map(lambda x: len(b)).count() > 5 > ----> 6 run() > <ipython-input-14-ba73d84faba7> in run() > 2 import numpy.random as nr > 3 b = nr.bytes(8 * 1000000000) > ----> 4 sc.parallelize(range(1000), 1000).map(lambda x: len(b)).count() > 5 > 6 run() > /databricks/spark/python/pyspark/rdd.pyc in count(self) > 1002 3 > 1003 """ > -> 1004 return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum() > 1005 > 1006 def stats(self): > /databricks/spark/python/pyspark/rdd.pyc in sum(self) > 993 6.0 > 994 """ > --> 995 return self.mapPartitions(lambda x: [sum(x)]).fold(0, > operator.add) > 996 > 997 def count(self): > /databricks/spark/python/pyspark/rdd.pyc in fold(self, zeroValue, op) > 867 # zeroValue provided to each partition is unique from the one > provided > 868 # to the final reduce call > --> 869 vals = self.mapPartitions(func).collect() > 870 return reduce(op, vals, zeroValue) > 871 > /databricks/spark/python/pyspark/rdd.pyc in collect(self) > 769 """ > 770 with SCCallSiteSync(self.context) as css: > --> 771 port = > self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd()) > 772 return list(_load_from_socket(port, self._jrdd_deserializer)) > 773 > /databricks/spark/python/pyspark/rdd.pyc in _jrdd(self) > 2377 command = (self.func, profiler, self._prev_jrdd_deserializer, > 2378 self._jrdd_deserializer) > -> 2379 pickled_cmd, bvars, env, includes = > _prepare_for_python_RDD(self.ctx, command, self) > 2380 python_rdd = self.ctx._jvm.PythonRDD(self._prev_jrdd.rdd(), > 2381 bytearray(pickled_cmd), > /databricks/spark/python/pyspark/rdd.pyc in _prepare_for_python_RDD(sc, > command, obj) > 2297 # the serialized command will be compressed by broadcast > 2298 ser = CloudPickleSerializer() > -> 2299 pickled_command = ser.dumps(command) > 2300 if len(pickled_command) > (1 << 20): # 1M > 2301 # The broadcast will have same life cycle as created PythonRDD > /databricks/spark/python/pyspark/serializers.pyc in dumps(self, obj) > 426 > 427 def dumps(self, obj): > --> 428 return cloudpickle.dumps(obj, 2) > 429 > 430 > /databricks/spark/python/pyspark/cloudpickle.pyc in dumps(obj, protocol) > 655 > 656 cp = CloudPickler(file,protocol) > --> 657 cp.dump(obj) > 658 > 659 return file.getvalue() > /databricks/spark/python/pyspark/cloudpickle.pyc in dump(self, obj) > 105 self.inject_addons() > 106 try: > --> 107 return Pickler.dump(self, obj) > 108 except RuntimeError as e: > 109 if 'recursion' in e.args[0]: > /usr/lib/python2.7/pickle.pyc in dump(self, obj) > 222 if self.proto >= 2: > 223 self.write(PROTO + chr(self.proto)) > --> 224 self.save(obj) > 225 self.write(STOP) > 226 > /usr/lib/python2.7/pickle.pyc in save(self, obj) > 284 f = self.dispatch.get(t) > 285 if f: > --> 286 f(self, obj) # Call unbound method with explicit self > 287 return > 288 > /usr/lib/python2.7/pickle.pyc in save_tuple(self, obj) > 560 write(MARK) > 561 for element in obj: > --> 562 save(element) > 563 > 564 if id(obj) in memo: > /usr/lib/python2.7/pickle.pyc in save(self, obj) > 284 f = self.dispatch.get(t) > 285 if f: > --> 286 f(self, obj) # Call unbound method with explicit self > 287 return > 288 > /databricks/spark/python/pyspark/cloudpickle.pyc in save_function(self, obj, > name) > 202 klass = getattr(themodule, name, None) > 203 if klass is None or klass is not obj: > --> 204 self.save_function_tuple(obj) > 205 return > 206 > /databricks/spark/python/pyspark/cloudpickle.pyc in save_function_tuple(self, > func) > 239 # create a skeleton function object and memoize it > 240 save(_make_skel_func) > --> 241 save((code, closure, base_globals)) > 242 write(pickle.REDUCE) > 243 self.memoize(func) > /usr/lib/python2.7/pickle.pyc in save(self, obj) > 284 f = self.dispatch.get(t) > 285 if f: > --> 286 f(self, obj) # Call unbound method with explicit self > 287 return > 288 > /usr/lib/python2.7/pickle.pyc in save_tuple(self, obj) > 546 if n <= 3 and proto >= 2: > 547 for element in obj: > --> 548 save(element) > 549 # Subtle. Same as in the big comment below. > 550 if id(obj) in memo: > /usr/lib/python2.7/pickle.pyc in save(self, obj) > 284 f = self.dispatch.get(t) > 285 if f: > --> 286 f(self, obj) # Call unbound method with explicit self > 287 return > 288 > /usr/lib/python2.7/pickle.pyc in save_list(self, obj) > 598 > 599 self.memoize(obj) > --> 600 self._batch_appends(iter(obj)) > 601 > 602 dispatch[ListType] = save_list > /usr/lib/python2.7/pickle.pyc in _batch_appends(self, items) > 631 write(MARK) > 632 for x in tmp: > --> 633 save(x) > 634 write(APPENDS) > 635 elif n: > /usr/lib/python2.7/pickle.pyc in save(self, obj) > 284 f = self.dispatch.get(t) > 285 if f: > --> 286 f(self, obj) # Call unbound method with explicit self > 287 return > 288 > /databricks/spark/python/pyspark/cloudpickle.pyc in save_function(self, obj, > name) > 202 klass = getattr(themodule, name, None) > 203 if klass is None or klass is not obj: > --> 204 self.save_function_tuple(obj) > 205 return > 206 > /databricks/spark/python/pyspark/cloudpickle.pyc in save_function_tuple(self, > func) > 239 # create a skeleton function object and memoize it > 240 save(_make_skel_func) > --> 241 save((code, closure, base_globals)) > 242 write(pickle.REDUCE) > 243 self.memoize(func) > /usr/lib/python2.7/pickle.pyc in save(self, obj) > 284 f = self.dispatch.get(t) > 285 if f: > --> 286 f(self, obj) # Call unbound method with explicit self > 287 return > 288 > /usr/lib/python2.7/pickle.pyc in save_tuple(self, obj) > 546 if n <= 3 and proto >= 2: > 547 for element in obj: > --> 548 save(element) > 549 # Subtle. Same as in the big comment below. > 550 if id(obj) in memo: > /usr/lib/python2.7/pickle.pyc in save(self, obj) > 284 f = self.dispatch.get(t) > 285 if f: > --> 286 f(self, obj) # Call unbound method with explicit self > 287 return > 288 > /usr/lib/python2.7/pickle.pyc in save_list(self, obj) > 598 > 599 self.memoize(obj) > --> 600 self._batch_appends(iter(obj)) > 601 > 602 dispatch[ListType] = save_list > /usr/lib/python2.7/pickle.pyc in _batch_appends(self, items) > 631 write(MARK) > 632 for x in tmp: > --> 633 save(x) > 634 write(APPENDS) > 635 elif n: > /usr/lib/python2.7/pickle.pyc in save(self, obj) > 284 f = self.dispatch.get(t) > 285 if f: > --> 286 f(self, obj) # Call unbound method with explicit self > 287 return > 288 > /databricks/spark/python/pyspark/cloudpickle.pyc in save_function(self, obj, > name) > 202 klass = getattr(themodule, name, None) > 203 if klass is None or klass is not obj: > --> 204 self.save_function_tuple(obj) > 205 return > 206 > /databricks/spark/python/pyspark/cloudpickle.pyc in save_function_tuple(self, > func) > 239 # create a skeleton function object and memoize it > 240 save(_make_skel_func) > --> 241 save((code, closure, base_globals)) > 242 write(pickle.REDUCE) > 243 self.memoize(func) > /usr/lib/python2.7/pickle.pyc in save(self, obj) > 284 f = self.dispatch.get(t) > 285 if f: > --> 286 f(self, obj) # Call unbound method with explicit self > 287 return > 288 > /usr/lib/python2.7/pickle.pyc in save_tuple(self, obj) > 546 if n <= 3 and proto >= 2: > 547 for element in obj: > --> 548 save(element) > 549 # Subtle. Same as in the big comment below. > 550 if id(obj) in memo: > /usr/lib/python2.7/pickle.pyc in save(self, obj) > 284 f = self.dispatch.get(t) > 285 if f: > --> 286 f(self, obj) # Call unbound method with explicit self > 287 return > 288 > /usr/lib/python2.7/pickle.pyc in save_list(self, obj) > 598 > 599 self.memoize(obj) > --> 600 self._batch_appends(iter(obj)) > 601 > 602 dispatch[ListType] = save_list > /usr/lib/python2.7/pickle.pyc in _batch_appends(self, items) > 631 write(MARK) > 632 for x in tmp: > --> 633 save(x) > 634 write(APPENDS) > 635 elif n: > /usr/lib/python2.7/pickle.pyc in save(self, obj) > 284 f = self.dispatch.get(t) > 285 if f: > --> 286 f(self, obj) # Call unbound method with explicit self > 287 return > 288 > /databricks/spark/python/pyspark/cloudpickle.pyc in save_function(self, obj, > name) > 202 klass = getattr(themodule, name, None) > 203 if klass is None or klass is not obj: > --> 204 self.save_function_tuple(obj) > 205 return > 206 > /databricks/spark/python/pyspark/cloudpickle.pyc in save_function_tuple(self, > func) > 239 # create a skeleton function object and memoize it > 240 save(_make_skel_func) > --> 241 save((code, closure, base_globals)) > 242 write(pickle.REDUCE) > 243 self.memoize(func) > /usr/lib/python2.7/pickle.pyc in save(self, obj) > 284 f = self.dispatch.get(t) > 285 if f: > --> 286 f(self, obj) # Call unbound method with explicit self > 287 return > 288 > /usr/lib/python2.7/pickle.pyc in save_tuple(self, obj) > 546 if n <= 3 and proto >= 2: > 547 for element in obj: > --> 548 save(element) > 549 # Subtle. Same as in the big comment below. > 550 if id(obj) in memo: > /usr/lib/python2.7/pickle.pyc in save(self, obj) > 284 f = self.dispatch.get(t) > 285 if f: > --> 286 f(self, obj) # Call unbound method with explicit self > 287 return > 288 > /usr/lib/python2.7/pickle.pyc in save_list(self, obj) > 598 > 599 self.memoize(obj) > --> 600 self._batch_appends(iter(obj)) > 601 > 602 dispatch[ListType] = save_list > /usr/lib/python2.7/pickle.pyc in _batch_appends(self, items) > 634 write(APPENDS) > 635 elif n: > --> 636 save(tmp[0]) > 637 write(APPEND) > 638 # else tmp is empty, and we're done > /usr/lib/python2.7/pickle.pyc in save(self, obj) > 284 f = self.dispatch.get(t) > 285 if f: > --> 286 f(self, obj) # Call unbound method with explicit self > 287 return > 288 > /databricks/spark/python/pyspark/cloudpickle.pyc in save_function(self, obj, > name) > 196 if islambda(obj) or obj.__code__.co_filename == '<stdin>' or > themodule is None: > 197 #print("save global", islambda(obj), > obj.__code__.co_filename, modname, themodule) > --> 198 self.save_function_tuple(obj) > 199 return > 200 else: > /databricks/spark/python/pyspark/cloudpickle.pyc in save_function_tuple(self, > func) > 239 # create a skeleton function object and memoize it > 240 save(_make_skel_func) > --> 241 save((code, closure, base_globals)) > 242 write(pickle.REDUCE) > 243 self.memoize(func) > /usr/lib/python2.7/pickle.pyc in save(self, obj) > 284 f = self.dispatch.get(t) > 285 if f: > --> 286 f(self, obj) # Call unbound method with explicit self > 287 return > 288 > /usr/lib/python2.7/pickle.pyc in save_tuple(self, obj) > 546 if n <= 3 and proto >= 2: > 547 for element in obj: > --> 548 save(element) > 549 # Subtle. Same as in the big comment below. > 550 if id(obj) in memo: > /usr/lib/python2.7/pickle.pyc in save(self, obj) > 284 f = self.dispatch.get(t) > 285 if f: > --> 286 f(self, obj) # Call unbound method with explicit self > 287 return > 288 > /usr/lib/python2.7/pickle.pyc in save_list(self, obj) > 598 > 599 self.memoize(obj) > --> 600 self._batch_appends(iter(obj)) > 601 > 602 dispatch[ListType] = save_list > /usr/lib/python2.7/pickle.pyc in _batch_appends(self, items) > 634 write(APPENDS) > 635 elif n: > --> 636 save(tmp[0]) > 637 write(APPEND) > 638 # else tmp is empty, and we're done > /usr/lib/python2.7/pickle.pyc in save(self, obj) > 284 f = self.dispatch.get(t) > 285 if f: > --> 286 f(self, obj) # Call unbound method with explicit self > 287 return > 288 > /usr/lib/python2.7/pickle.pyc in save_string(self, obj, pack) > 484 self.write(SHORT_BINSTRING + chr(n) + obj) > 485 else: > --> 486 self.write(BINSTRING + pack("<i", n) + obj) > 487 else: > 488 self.write(STRING + repr(obj) + '\n') > error: 'i' format requires -2147483648 <= number <= 2147483647 > {code} > ======================================================= > {code} > def run(): > import numpy.random as nr > b = sc.broadcast(nr.bytes(8 * 1000000000)) > sc.parallelize(range(1000), 1000).map(lambda x: len(b.value)).count() > run() > {code} > Gives you > {code} > --------------------------------------------------------------------------- > SystemError Traceback (most recent call last) > <ipython-input-14-53cbdb8ed528> in <module>() > 4 sc.parallelize(range(1000), 1000).map(lambda x: > len(b.value)).count() > 5 > ----> 6 run() > <ipython-input-14-53cbdb8ed528> in run() > 1 def run(): > 2 import numpy.random as nr > ----> 3 b = sc.broadcast(nr.bytes(8 * 1000000000)) > 4 sc.parallelize(range(1000), 1000).map(lambda x: > len(b.value)).count() > 5 > /databricks/spark/python/pyspark/context.py in broadcast(self, value) > 741 be sent to each cluster only once. > 742 """ > --> 743 return Broadcast(self, value, self._pickled_broadcast_vars) > 744 > 745 def accumulator(self, value, accum_param=None): > /databricks/spark/python/pyspark/broadcast.py in __init__(self, sc, value, > pickle_registry, path) > 68 if sc is not None: > 69 f = NamedTemporaryFile(delete=False, dir=sc._temp_dir) > ---> 70 self._path = self.dump(value, f) > 71 self._jbroadcast = > sc._jvm.PythonRDD.readBroadcastFromFile(sc._jsc, self._path) > 72 self._pickle_registry = pickle_registry > /databricks/spark/python/pyspark/broadcast.py in dump(self, value, f) > 76 > 77 def dump(self, value, f): > ---> 78 pickle.dump(value, f, 2) > 79 f.close() > 80 return f.name > SystemError: error return without exception set > {code} > In both cases, we should have a better error saying that the task or > broadcast could not be serialized or was too big. -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org