Eric Liang created SPARK-17472: ---------------------------------- Summary: Better error message for serialization failures of large objects in Python Key: SPARK-17472 URL: https://issues.apache.org/jira/browse/SPARK-17472 Project: Spark Issue Type: Improvement Components: PySpark Reporter: Eric Liang Priority: Minor
{code} def run(): import numpy.random as nr b = nr.bytes(8 * 1000000000) sc.parallelize(range(1000), 1000).map(lambda x: len(b)).count() run() {code} Gives you the following error from pickle {code} error: 'i' format requires -2147483648 <= number <= 2147483647 --------------------------------------------------------------------------- error Traceback (most recent call last) <ipython-input-14-ba73d84faba7> in <module>() 4 sc.parallelize(range(1000), 1000).map(lambda x: len(b)).count() 5 ----> 6 run() <ipython-input-14-ba73d84faba7> in run() 2 import numpy.random as nr 3 b = nr.bytes(8 * 1000000000) ----> 4 sc.parallelize(range(1000), 1000).map(lambda x: len(b)).count() 5 6 run() /databricks/spark/python/pyspark/rdd.pyc in count(self) 1002 3 1003 """ -> 1004 return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum() 1005 1006 def stats(self): /databricks/spark/python/pyspark/rdd.pyc in sum(self) 993 6.0 994 """ --> 995 return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add) 996 997 def count(self): /databricks/spark/python/pyspark/rdd.pyc in fold(self, zeroValue, op) 867 # zeroValue provided to each partition is unique from the one provided 868 # to the final reduce call --> 869 vals = self.mapPartitions(func).collect() 870 return reduce(op, vals, zeroValue) 871 /databricks/spark/python/pyspark/rdd.pyc in collect(self) 769 """ 770 with SCCallSiteSync(self.context) as css: --> 771 port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd()) 772 return list(_load_from_socket(port, self._jrdd_deserializer)) 773 /databricks/spark/python/pyspark/rdd.pyc in _jrdd(self) 2377 command = (self.func, profiler, self._prev_jrdd_deserializer, 2378 self._jrdd_deserializer) -> 2379 pickled_cmd, bvars, env, includes = _prepare_for_python_RDD(self.ctx, command, self) 2380 python_rdd = self.ctx._jvm.PythonRDD(self._prev_jrdd.rdd(), 2381 bytearray(pickled_cmd), /databricks/spark/python/pyspark/rdd.pyc in _prepare_for_python_RDD(sc, command, obj) 2297 # the serialized command will be compressed by broadcast 2298 ser = CloudPickleSerializer() -> 2299 pickled_command = ser.dumps(command) 2300 if len(pickled_command) > (1 << 20): # 1M 2301 # The broadcast will have same life cycle as created PythonRDD /databricks/spark/python/pyspark/serializers.pyc in dumps(self, obj) 426 427 def dumps(self, obj): --> 428 return cloudpickle.dumps(obj, 2) 429 430 /databricks/spark/python/pyspark/cloudpickle.pyc in dumps(obj, protocol) 655 656 cp = CloudPickler(file,protocol) --> 657 cp.dump(obj) 658 659 return file.getvalue() /databricks/spark/python/pyspark/cloudpickle.pyc in dump(self, obj) 105 self.inject_addons() 106 try: --> 107 return Pickler.dump(self, obj) 108 except RuntimeError as e: 109 if 'recursion' in e.args[0]: /usr/lib/python2.7/pickle.pyc in dump(self, obj) 222 if self.proto >= 2: 223 self.write(PROTO + chr(self.proto)) --> 224 self.save(obj) 225 self.write(STOP) 226 /usr/lib/python2.7/pickle.pyc in save(self, obj) 284 f = self.dispatch.get(t) 285 if f: --> 286 f(self, obj) # Call unbound method with explicit self 287 return 288 /usr/lib/python2.7/pickle.pyc in save_tuple(self, obj) 560 write(MARK) 561 for element in obj: --> 562 save(element) 563 564 if id(obj) in memo: /usr/lib/python2.7/pickle.pyc in save(self, obj) 284 f = self.dispatch.get(t) 285 if f: --> 286 f(self, obj) # Call unbound method with explicit self 287 return 288 /databricks/spark/python/pyspark/cloudpickle.pyc in save_function(self, obj, name) 202 klass = getattr(themodule, name, None) 203 if klass is None or klass is not obj: --> 204 self.save_function_tuple(obj) 205 return 206 /databricks/spark/python/pyspark/cloudpickle.pyc in save_function_tuple(self, func) 239 # create a skeleton function object and memoize it 240 save(_make_skel_func) --> 241 save((code, closure, base_globals)) 242 write(pickle.REDUCE) 243 self.memoize(func) /usr/lib/python2.7/pickle.pyc in save(self, obj) 284 f = self.dispatch.get(t) 285 if f: --> 286 f(self, obj) # Call unbound method with explicit self 287 return 288 /usr/lib/python2.7/pickle.pyc in save_tuple(self, obj) 546 if n <= 3 and proto >= 2: 547 for element in obj: --> 548 save(element) 549 # Subtle. Same as in the big comment below. 550 if id(obj) in memo: /usr/lib/python2.7/pickle.pyc in save(self, obj) 284 f = self.dispatch.get(t) 285 if f: --> 286 f(self, obj) # Call unbound method with explicit self 287 return 288 /usr/lib/python2.7/pickle.pyc in save_list(self, obj) 598 599 self.memoize(obj) --> 600 self._batch_appends(iter(obj)) 601 602 dispatch[ListType] = save_list /usr/lib/python2.7/pickle.pyc in _batch_appends(self, items) 631 write(MARK) 632 for x in tmp: --> 633 save(x) 634 write(APPENDS) 635 elif n: /usr/lib/python2.7/pickle.pyc in save(self, obj) 284 f = self.dispatch.get(t) 285 if f: --> 286 f(self, obj) # Call unbound method with explicit self 287 return 288 /databricks/spark/python/pyspark/cloudpickle.pyc in save_function(self, obj, name) 202 klass = getattr(themodule, name, None) 203 if klass is None or klass is not obj: --> 204 self.save_function_tuple(obj) 205 return 206 /databricks/spark/python/pyspark/cloudpickle.pyc in save_function_tuple(self, func) 239 # create a skeleton function object and memoize it 240 save(_make_skel_func) --> 241 save((code, closure, base_globals)) 242 write(pickle.REDUCE) 243 self.memoize(func) /usr/lib/python2.7/pickle.pyc in save(self, obj) 284 f = self.dispatch.get(t) 285 if f: --> 286 f(self, obj) # Call unbound method with explicit self 287 return 288 /usr/lib/python2.7/pickle.pyc in save_tuple(self, obj) 546 if n <= 3 and proto >= 2: 547 for element in obj: --> 548 save(element) 549 # Subtle. Same as in the big comment below. 550 if id(obj) in memo: /usr/lib/python2.7/pickle.pyc in save(self, obj) 284 f = self.dispatch.get(t) 285 if f: --> 286 f(self, obj) # Call unbound method with explicit self 287 return 288 /usr/lib/python2.7/pickle.pyc in save_list(self, obj) 598 599 self.memoize(obj) --> 600 self._batch_appends(iter(obj)) 601 602 dispatch[ListType] = save_list /usr/lib/python2.7/pickle.pyc in _batch_appends(self, items) 631 write(MARK) 632 for x in tmp: --> 633 save(x) 634 write(APPENDS) 635 elif n: /usr/lib/python2.7/pickle.pyc in save(self, obj) 284 f = self.dispatch.get(t) 285 if f: --> 286 f(self, obj) # Call unbound method with explicit self 287 return 288 /databricks/spark/python/pyspark/cloudpickle.pyc in save_function(self, obj, name) 202 klass = getattr(themodule, name, None) 203 if klass is None or klass is not obj: --> 204 self.save_function_tuple(obj) 205 return 206 /databricks/spark/python/pyspark/cloudpickle.pyc in save_function_tuple(self, func) 239 # create a skeleton function object and memoize it 240 save(_make_skel_func) --> 241 save((code, closure, base_globals)) 242 write(pickle.REDUCE) 243 self.memoize(func) /usr/lib/python2.7/pickle.pyc in save(self, obj) 284 f = self.dispatch.get(t) 285 if f: --> 286 f(self, obj) # Call unbound method with explicit self 287 return 288 /usr/lib/python2.7/pickle.pyc in save_tuple(self, obj) 546 if n <= 3 and proto >= 2: 547 for element in obj: --> 548 save(element) 549 # Subtle. Same as in the big comment below. 550 if id(obj) in memo: /usr/lib/python2.7/pickle.pyc in save(self, obj) 284 f = self.dispatch.get(t) 285 if f: --> 286 f(self, obj) # Call unbound method with explicit self 287 return 288 /usr/lib/python2.7/pickle.pyc in save_list(self, obj) 598 599 self.memoize(obj) --> 600 self._batch_appends(iter(obj)) 601 602 dispatch[ListType] = save_list /usr/lib/python2.7/pickle.pyc in _batch_appends(self, items) 631 write(MARK) 632 for x in tmp: --> 633 save(x) 634 write(APPENDS) 635 elif n: /usr/lib/python2.7/pickle.pyc in save(self, obj) 284 f = self.dispatch.get(t) 285 if f: --> 286 f(self, obj) # Call unbound method with explicit self 287 return 288 /databricks/spark/python/pyspark/cloudpickle.pyc in save_function(self, obj, name) 202 klass = getattr(themodule, name, None) 203 if klass is None or klass is not obj: --> 204 self.save_function_tuple(obj) 205 return 206 /databricks/spark/python/pyspark/cloudpickle.pyc in save_function_tuple(self, func) 239 # create a skeleton function object and memoize it 240 save(_make_skel_func) --> 241 save((code, closure, base_globals)) 242 write(pickle.REDUCE) 243 self.memoize(func) /usr/lib/python2.7/pickle.pyc in save(self, obj) 284 f = self.dispatch.get(t) 285 if f: --> 286 f(self, obj) # Call unbound method with explicit self 287 return 288 /usr/lib/python2.7/pickle.pyc in save_tuple(self, obj) 546 if n <= 3 and proto >= 2: 547 for element in obj: --> 548 save(element) 549 # Subtle. Same as in the big comment below. 550 if id(obj) in memo: /usr/lib/python2.7/pickle.pyc in save(self, obj) 284 f = self.dispatch.get(t) 285 if f: --> 286 f(self, obj) # Call unbound method with explicit self 287 return 288 /usr/lib/python2.7/pickle.pyc in save_list(self, obj) 598 599 self.memoize(obj) --> 600 self._batch_appends(iter(obj)) 601 602 dispatch[ListType] = save_list /usr/lib/python2.7/pickle.pyc in _batch_appends(self, items) 634 write(APPENDS) 635 elif n: --> 636 save(tmp[0]) 637 write(APPEND) 638 # else tmp is empty, and we're done /usr/lib/python2.7/pickle.pyc in save(self, obj) 284 f = self.dispatch.get(t) 285 if f: --> 286 f(self, obj) # Call unbound method with explicit self 287 return 288 /databricks/spark/python/pyspark/cloudpickle.pyc in save_function(self, obj, name) 196 if islambda(obj) or obj.__code__.co_filename == '<stdin>' or themodule is None: 197 #print("save global", islambda(obj), obj.__code__.co_filename, modname, themodule) --> 198 self.save_function_tuple(obj) 199 return 200 else: /databricks/spark/python/pyspark/cloudpickle.pyc in save_function_tuple(self, func) 239 # create a skeleton function object and memoize it 240 save(_make_skel_func) --> 241 save((code, closure, base_globals)) 242 write(pickle.REDUCE) 243 self.memoize(func) /usr/lib/python2.7/pickle.pyc in save(self, obj) 284 f = self.dispatch.get(t) 285 if f: --> 286 f(self, obj) # Call unbound method with explicit self 287 return 288 /usr/lib/python2.7/pickle.pyc in save_tuple(self, obj) 546 if n <= 3 and proto >= 2: 547 for element in obj: --> 548 save(element) 549 # Subtle. Same as in the big comment below. 550 if id(obj) in memo: /usr/lib/python2.7/pickle.pyc in save(self, obj) 284 f = self.dispatch.get(t) 285 if f: --> 286 f(self, obj) # Call unbound method with explicit self 287 return 288 /usr/lib/python2.7/pickle.pyc in save_list(self, obj) 598 599 self.memoize(obj) --> 600 self._batch_appends(iter(obj)) 601 602 dispatch[ListType] = save_list /usr/lib/python2.7/pickle.pyc in _batch_appends(self, items) 634 write(APPENDS) 635 elif n: --> 636 save(tmp[0]) 637 write(APPEND) 638 # else tmp is empty, and we're done /usr/lib/python2.7/pickle.pyc in save(self, obj) 284 f = self.dispatch.get(t) 285 if f: --> 286 f(self, obj) # Call unbound method with explicit self 287 return 288 /usr/lib/python2.7/pickle.pyc in save_string(self, obj, pack) 484 self.write(SHORT_BINSTRING + chr(n) + obj) 485 else: --> 486 self.write(BINSTRING + pack("<i", n) + obj) 487 else: 488 self.write(STRING + repr(obj) + '\n') error: 'i' format requires -2147483648 <= number <= 2147483647 {code} ======================================================= {code} def run(): import numpy.random as nr b = sc.broadcast(nr.bytes(8 * 1000000000)) sc.parallelize(range(1000), 1000).map(lambda x: len(b.value)).count() run() {code} Gives you {code} --------------------------------------------------------------------------- SystemError Traceback (most recent call last) <ipython-input-14-53cbdb8ed528> in <module>() 4 sc.parallelize(range(1000), 1000).map(lambda x: len(b.value)).count() 5 ----> 6 run() <ipython-input-14-53cbdb8ed528> in run() 1 def run(): 2 import numpy.random as nr ----> 3 b = sc.broadcast(nr.bytes(8 * 1000000000)) 4 sc.parallelize(range(1000), 1000).map(lambda x: len(b.value)).count() 5 /databricks/spark/python/pyspark/context.py in broadcast(self, value) 741 be sent to each cluster only once. 742 """ --> 743 return Broadcast(self, value, self._pickled_broadcast_vars) 744 745 def accumulator(self, value, accum_param=None): /databricks/spark/python/pyspark/broadcast.py in __init__(self, sc, value, pickle_registry, path) 68 if sc is not None: 69 f = NamedTemporaryFile(delete=False, dir=sc._temp_dir) ---> 70 self._path = self.dump(value, f) 71 self._jbroadcast = sc._jvm.PythonRDD.readBroadcastFromFile(sc._jsc, self._path) 72 self._pickle_registry = pickle_registry /databricks/spark/python/pyspark/broadcast.py in dump(self, value, f) 76 77 def dump(self, value, f): ---> 78 pickle.dump(value, f, 2) 79 f.close() 80 return f.name SystemError: error return without exception set {code} In both cases, we should have a better error saying that the task or broadcast could not be serialized or was too big. -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org