Eric Liang created SPARK-17472:
----------------------------------

             Summary: Better error message for serialization failures of large 
objects in Python
                 Key: SPARK-17472
                 URL: https://issues.apache.org/jira/browse/SPARK-17472
             Project: Spark
          Issue Type: Improvement
          Components: PySpark
            Reporter: Eric Liang
            Priority: Minor


{code}
def run():
  import numpy.random as nr
  b = nr.bytes(8 * 1000000000)
  sc.parallelize(range(1000), 1000).map(lambda x: len(b)).count()

run()
{code}

Gives you the following error from pickle
{code}
error: 'i' format requires -2147483648 <= number <= 2147483647

---------------------------------------------------------------------------
error                                     Traceback (most recent call last)
<ipython-input-14-ba73d84faba7> in <module>()
      4   sc.parallelize(range(1000), 1000).map(lambda x: len(b)).count()
      5 
----> 6 run()

<ipython-input-14-ba73d84faba7> in run()
      2   import numpy.random as nr
      3   b = nr.bytes(8 * 1000000000)
----> 4   sc.parallelize(range(1000), 1000).map(lambda x: len(b)).count()
      5 
      6 run()

/databricks/spark/python/pyspark/rdd.pyc in count(self)
   1002         3
   1003         """
-> 1004         return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
   1005 
   1006     def stats(self):

/databricks/spark/python/pyspark/rdd.pyc in sum(self)
    993         6.0
    994         """
--> 995         return self.mapPartitions(lambda x: [sum(x)]).fold(0, 
operator.add)
    996 
    997     def count(self):

/databricks/spark/python/pyspark/rdd.pyc in fold(self, zeroValue, op)
    867         # zeroValue provided to each partition is unique from the one 
provided
    868         # to the final reduce call
--> 869         vals = self.mapPartitions(func).collect()
    870         return reduce(op, vals, zeroValue)
    871 

/databricks/spark/python/pyspark/rdd.pyc in collect(self)
    769         """
    770         with SCCallSiteSync(self.context) as css:
--> 771             port = 
self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
    772         return list(_load_from_socket(port, self._jrdd_deserializer))
    773 

/databricks/spark/python/pyspark/rdd.pyc in _jrdd(self)
   2377         command = (self.func, profiler, self._prev_jrdd_deserializer,
   2378                    self._jrdd_deserializer)
-> 2379         pickled_cmd, bvars, env, includes = 
_prepare_for_python_RDD(self.ctx, command, self)
   2380         python_rdd = self.ctx._jvm.PythonRDD(self._prev_jrdd.rdd(),
   2381                                              bytearray(pickled_cmd),

/databricks/spark/python/pyspark/rdd.pyc in _prepare_for_python_RDD(sc, 
command, obj)
   2297     # the serialized command will be compressed by broadcast
   2298     ser = CloudPickleSerializer()
-> 2299     pickled_command = ser.dumps(command)
   2300     if len(pickled_command) > (1 << 20):  # 1M
   2301         # The broadcast will have same life cycle as created PythonRDD

/databricks/spark/python/pyspark/serializers.pyc in dumps(self, obj)
    426 
    427     def dumps(self, obj):
--> 428         return cloudpickle.dumps(obj, 2)
    429 
    430 

/databricks/spark/python/pyspark/cloudpickle.pyc in dumps(obj, protocol)
    655 
    656     cp = CloudPickler(file,protocol)
--> 657     cp.dump(obj)
    658 
    659     return file.getvalue()

/databricks/spark/python/pyspark/cloudpickle.pyc in dump(self, obj)
    105         self.inject_addons()
    106         try:
--> 107             return Pickler.dump(self, obj)
    108         except RuntimeError as e:
    109             if 'recursion' in e.args[0]:

/usr/lib/python2.7/pickle.pyc in dump(self, obj)
    222         if self.proto >= 2:
    223             self.write(PROTO + chr(self.proto))
--> 224         self.save(obj)
    225         self.write(STOP)
    226 

/usr/lib/python2.7/pickle.pyc in save(self, obj)
    284         f = self.dispatch.get(t)
    285         if f:
--> 286             f(self, obj) # Call unbound method with explicit self
    287             return
    288 

/usr/lib/python2.7/pickle.pyc in save_tuple(self, obj)
    560         write(MARK)
    561         for element in obj:
--> 562             save(element)
    563 
    564         if id(obj) in memo:

/usr/lib/python2.7/pickle.pyc in save(self, obj)
    284         f = self.dispatch.get(t)
    285         if f:
--> 286             f(self, obj) # Call unbound method with explicit self
    287             return
    288 

/databricks/spark/python/pyspark/cloudpickle.pyc in save_function(self, obj, 
name)
    202             klass = getattr(themodule, name, None)
    203             if klass is None or klass is not obj:
--> 204                 self.save_function_tuple(obj)
    205                 return
    206 

/databricks/spark/python/pyspark/cloudpickle.pyc in save_function_tuple(self, 
func)
    239         # create a skeleton function object and memoize it
    240         save(_make_skel_func)
--> 241         save((code, closure, base_globals))
    242         write(pickle.REDUCE)
    243         self.memoize(func)

/usr/lib/python2.7/pickle.pyc in save(self, obj)
    284         f = self.dispatch.get(t)
    285         if f:
--> 286             f(self, obj) # Call unbound method with explicit self
    287             return
    288 

/usr/lib/python2.7/pickle.pyc in save_tuple(self, obj)
    546         if n <= 3 and proto >= 2:
    547             for element in obj:
--> 548                 save(element)
    549             # Subtle.  Same as in the big comment below.
    550             if id(obj) in memo:

/usr/lib/python2.7/pickle.pyc in save(self, obj)
    284         f = self.dispatch.get(t)
    285         if f:
--> 286             f(self, obj) # Call unbound method with explicit self
    287             return
    288 

/usr/lib/python2.7/pickle.pyc in save_list(self, obj)
    598 
    599         self.memoize(obj)
--> 600         self._batch_appends(iter(obj))
    601 
    602     dispatch[ListType] = save_list

/usr/lib/python2.7/pickle.pyc in _batch_appends(self, items)
    631                 write(MARK)
    632                 for x in tmp:
--> 633                     save(x)
    634                 write(APPENDS)
    635             elif n:

/usr/lib/python2.7/pickle.pyc in save(self, obj)
    284         f = self.dispatch.get(t)
    285         if f:
--> 286             f(self, obj) # Call unbound method with explicit self
    287             return
    288 

/databricks/spark/python/pyspark/cloudpickle.pyc in save_function(self, obj, 
name)
    202             klass = getattr(themodule, name, None)
    203             if klass is None or klass is not obj:
--> 204                 self.save_function_tuple(obj)
    205                 return
    206 

/databricks/spark/python/pyspark/cloudpickle.pyc in save_function_tuple(self, 
func)
    239         # create a skeleton function object and memoize it
    240         save(_make_skel_func)
--> 241         save((code, closure, base_globals))
    242         write(pickle.REDUCE)
    243         self.memoize(func)

/usr/lib/python2.7/pickle.pyc in save(self, obj)
    284         f = self.dispatch.get(t)
    285         if f:
--> 286             f(self, obj) # Call unbound method with explicit self
    287             return
    288 

/usr/lib/python2.7/pickle.pyc in save_tuple(self, obj)
    546         if n <= 3 and proto >= 2:
    547             for element in obj:
--> 548                 save(element)
    549             # Subtle.  Same as in the big comment below.
    550             if id(obj) in memo:

/usr/lib/python2.7/pickle.pyc in save(self, obj)
    284         f = self.dispatch.get(t)
    285         if f:
--> 286             f(self, obj) # Call unbound method with explicit self
    287             return
    288 

/usr/lib/python2.7/pickle.pyc in save_list(self, obj)
    598 
    599         self.memoize(obj)
--> 600         self._batch_appends(iter(obj))
    601 
    602     dispatch[ListType] = save_list

/usr/lib/python2.7/pickle.pyc in _batch_appends(self, items)
    631                 write(MARK)
    632                 for x in tmp:
--> 633                     save(x)
    634                 write(APPENDS)
    635             elif n:

/usr/lib/python2.7/pickle.pyc in save(self, obj)
    284         f = self.dispatch.get(t)
    285         if f:
--> 286             f(self, obj) # Call unbound method with explicit self
    287             return
    288 

/databricks/spark/python/pyspark/cloudpickle.pyc in save_function(self, obj, 
name)
    202             klass = getattr(themodule, name, None)
    203             if klass is None or klass is not obj:
--> 204                 self.save_function_tuple(obj)
    205                 return
    206 

/databricks/spark/python/pyspark/cloudpickle.pyc in save_function_tuple(self, 
func)
    239         # create a skeleton function object and memoize it
    240         save(_make_skel_func)
--> 241         save((code, closure, base_globals))
    242         write(pickle.REDUCE)
    243         self.memoize(func)

/usr/lib/python2.7/pickle.pyc in save(self, obj)
    284         f = self.dispatch.get(t)
    285         if f:
--> 286             f(self, obj) # Call unbound method with explicit self
    287             return
    288 

/usr/lib/python2.7/pickle.pyc in save_tuple(self, obj)
    546         if n <= 3 and proto >= 2:
    547             for element in obj:
--> 548                 save(element)
    549             # Subtle.  Same as in the big comment below.
    550             if id(obj) in memo:

/usr/lib/python2.7/pickle.pyc in save(self, obj)
    284         f = self.dispatch.get(t)
    285         if f:
--> 286             f(self, obj) # Call unbound method with explicit self
    287             return
    288 

/usr/lib/python2.7/pickle.pyc in save_list(self, obj)
    598 
    599         self.memoize(obj)
--> 600         self._batch_appends(iter(obj))
    601 
    602     dispatch[ListType] = save_list

/usr/lib/python2.7/pickle.pyc in _batch_appends(self, items)
    631                 write(MARK)
    632                 for x in tmp:
--> 633                     save(x)
    634                 write(APPENDS)
    635             elif n:

/usr/lib/python2.7/pickle.pyc in save(self, obj)
    284         f = self.dispatch.get(t)
    285         if f:
--> 286             f(self, obj) # Call unbound method with explicit self
    287             return
    288 

/databricks/spark/python/pyspark/cloudpickle.pyc in save_function(self, obj, 
name)
    202             klass = getattr(themodule, name, None)
    203             if klass is None or klass is not obj:
--> 204                 self.save_function_tuple(obj)
    205                 return
    206 

/databricks/spark/python/pyspark/cloudpickle.pyc in save_function_tuple(self, 
func)
    239         # create a skeleton function object and memoize it
    240         save(_make_skel_func)
--> 241         save((code, closure, base_globals))
    242         write(pickle.REDUCE)
    243         self.memoize(func)

/usr/lib/python2.7/pickle.pyc in save(self, obj)
    284         f = self.dispatch.get(t)
    285         if f:
--> 286             f(self, obj) # Call unbound method with explicit self
    287             return
    288 

/usr/lib/python2.7/pickle.pyc in save_tuple(self, obj)
    546         if n <= 3 and proto >= 2:
    547             for element in obj:
--> 548                 save(element)
    549             # Subtle.  Same as in the big comment below.
    550             if id(obj) in memo:

/usr/lib/python2.7/pickle.pyc in save(self, obj)
    284         f = self.dispatch.get(t)
    285         if f:
--> 286             f(self, obj) # Call unbound method with explicit self
    287             return
    288 

/usr/lib/python2.7/pickle.pyc in save_list(self, obj)
    598 
    599         self.memoize(obj)
--> 600         self._batch_appends(iter(obj))
    601 
    602     dispatch[ListType] = save_list

/usr/lib/python2.7/pickle.pyc in _batch_appends(self, items)
    634                 write(APPENDS)
    635             elif n:
--> 636                 save(tmp[0])
    637                 write(APPEND)
    638             # else tmp is empty, and we're done

/usr/lib/python2.7/pickle.pyc in save(self, obj)
    284         f = self.dispatch.get(t)
    285         if f:
--> 286             f(self, obj) # Call unbound method with explicit self
    287             return
    288 

/databricks/spark/python/pyspark/cloudpickle.pyc in save_function(self, obj, 
name)
    196         if islambda(obj) or obj.__code__.co_filename == '<stdin>' or 
themodule is None:
    197             #print("save global", islambda(obj), 
obj.__code__.co_filename, modname, themodule)
--> 198             self.save_function_tuple(obj)
    199             return
    200         else:

/databricks/spark/python/pyspark/cloudpickle.pyc in save_function_tuple(self, 
func)
    239         # create a skeleton function object and memoize it
    240         save(_make_skel_func)
--> 241         save((code, closure, base_globals))
    242         write(pickle.REDUCE)
    243         self.memoize(func)

/usr/lib/python2.7/pickle.pyc in save(self, obj)
    284         f = self.dispatch.get(t)
    285         if f:
--> 286             f(self, obj) # Call unbound method with explicit self
    287             return
    288 

/usr/lib/python2.7/pickle.pyc in save_tuple(self, obj)
    546         if n <= 3 and proto >= 2:
    547             for element in obj:
--> 548                 save(element)
    549             # Subtle.  Same as in the big comment below.
    550             if id(obj) in memo:

/usr/lib/python2.7/pickle.pyc in save(self, obj)
    284         f = self.dispatch.get(t)
    285         if f:
--> 286             f(self, obj) # Call unbound method with explicit self
    287             return
    288 

/usr/lib/python2.7/pickle.pyc in save_list(self, obj)
    598 
    599         self.memoize(obj)
--> 600         self._batch_appends(iter(obj))
    601 
    602     dispatch[ListType] = save_list

/usr/lib/python2.7/pickle.pyc in _batch_appends(self, items)
    634                 write(APPENDS)
    635             elif n:
--> 636                 save(tmp[0])
    637                 write(APPEND)
    638             # else tmp is empty, and we're done

/usr/lib/python2.7/pickle.pyc in save(self, obj)
    284         f = self.dispatch.get(t)
    285         if f:
--> 286             f(self, obj) # Call unbound method with explicit self
    287             return
    288 

/usr/lib/python2.7/pickle.pyc in save_string(self, obj, pack)
    484                 self.write(SHORT_BINSTRING + chr(n) + obj)
    485             else:
--> 486                 self.write(BINSTRING + pack("<i", n) + obj)
    487         else:
    488             self.write(STRING + repr(obj) + '\n')

error: 'i' format requires -2147483648 <= number <= 2147483647
{code}

=======================================================
{code}
def run():
  import numpy.random as nr
  b = sc.broadcast(nr.bytes(8 * 1000000000))
  sc.parallelize(range(1000), 1000).map(lambda x: len(b.value)).count()

run()
{code}

Gives you
{code}
---------------------------------------------------------------------------
SystemError                               Traceback (most recent call last)
<ipython-input-14-53cbdb8ed528> in <module>()
      4   sc.parallelize(range(1000), 1000).map(lambda x: len(b.value)).count()
      5 
----> 6 run()

<ipython-input-14-53cbdb8ed528> in run()
      1 def run():
      2   import numpy.random as nr
----> 3   b = sc.broadcast(nr.bytes(8 * 1000000000))
      4   sc.parallelize(range(1000), 1000).map(lambda x: len(b.value)).count()
      5 

/databricks/spark/python/pyspark/context.py in broadcast(self, value)
    741         be sent to each cluster only once.
    742         """
--> 743         return Broadcast(self, value, self._pickled_broadcast_vars)
    744 
    745     def accumulator(self, value, accum_param=None):

/databricks/spark/python/pyspark/broadcast.py in __init__(self, sc, value, 
pickle_registry, path)
     68         if sc is not None:
     69             f = NamedTemporaryFile(delete=False, dir=sc._temp_dir)
---> 70             self._path = self.dump(value, f)
     71             self._jbroadcast = 
sc._jvm.PythonRDD.readBroadcastFromFile(sc._jsc, self._path)
     72             self._pickle_registry = pickle_registry

/databricks/spark/python/pyspark/broadcast.py in dump(self, value, f)
     76 
     77     def dump(self, value, f):
---> 78         pickle.dump(value, f, 2)
     79         f.close()
     80         return f.name

SystemError: error return without exception set
{code}

In both cases, we should have a better error saying that the task or broadcast 
could not be serialized or was too big.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to