[ 
https://issues.apache.org/jira/browse/SPARK-3721?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Brad Miller updated SPARK-3721:
-------------------------------
    Description: 
Attempting to reproduce the bug in isolation in iPython notebook I've observed 
the following 3 unique failure modes, all of which seem to be related to 
broadcast variable size. Note that I'm running python 2.7.3 on all machines and 
using the Spark 1.1.0 binaries.

**BLOCK 1** [no problem]
{{import cPickle}}
from pyspark import SparkContext

def check_pre_serialized(size):
    msg = cPickle.dumps(range(2 ** size))
    print 'serialized length:', len(msg)
    bvar = sc.broadcast(msg)
    print 'length recovered from broadcast variable:', len(bvar.value)
    print 'correct value recovered:', msg == bvar.value
    bvar.unpersist()    

def check_unserialized(size):
    msg = range(2 ** size)
    bvar = sc.broadcast(msg)
    print 'correct value recovered:', msg == bvar.value
    bvar.unpersist()

SparkContext.setSystemProperty('spark.executor.memory', '15g')
SparkContext.setSystemProperty('spark.cores.max', '5')
sc = SparkContext('spark://crosby.research.intel-research.net:7077', 
'broadcast_bug')}}

**BLOCK 2**  [no problem]
check_pre_serialized(20)
> serialized length: 9374656
> length recovered from broadcast variable: 9374656
> correct value recovered: True

**BLOCK 3**  [no problem]
check_unserialized(20)
> correct value recovered: True

**BLOCK 4**  [no problem]
check_pre_serialized(27)
> serialized length: 1499501632
> length recovered from broadcast variable: 1499501632
> correct value recovered: True

**BLOCK 5**  [no problem]
check_unserialized(27)
> correct value recovered: True

**BLOCK 6**  **[ERROR 1: unhandled error from cPickle.dumps inside 
sc.broadcast]**
check_pre_serialized(28)
.....
> /home/spark/greatest/python/pyspark/serializers.py in dumps(self, obj)
>     354
>     355     def dumps(self, obj):
> --> 356         return cPickle.dumps(obj, 2)
>     357
>     358     loads = cPickle.loads
>
> SystemError: error return without exception set

**BLOCK 7**  [no problem]
check_unserialized(28)
> correct value recovered: True

**BLOCK 8**  **[ERROR 2: no error occurs and *incorrect result* is returned]**
check_pre_serialized(29)
> serialized length: 6331339840
> length recovered from broadcast variable: 2036372544
> correct value recovered: False

**BLOCK 9**  **[ERROR 3: unhandled error from zlib.compress inside 
sc.broadcast]**
check_unserialized(29)
......
> /home/spark/greatest/python/pyspark/serializers.py in dumps(self, obj)
>     418 
>     419     def dumps(self, obj):
> --> 420         return zlib.compress(self.serializer.dumps(obj), 1)
>     421 
>     422     def loads(self, obj):
> 
> OverflowError: size does not fit in an int

**BLOCK 10**  [ERROR 1]
check_pre_serialized(30)
...same as above...

**BLOCK 11**  [ERROR 3]
check_unserialized(30)
...same as above...

  was:
Attempting to reproduce the bug in isolation in iPython notebook I've observed 
the following 3 unique failure modes, all of which seem to be related to 
broadcast variable size. Note that I'm running python 2.7.3 on all machines and 
using the Spark 1.1.0 binaries.

**BLOCK 1** [no problem]
{{import cPickle
from pyspark import SparkContext

def check_pre_serialized(size):
    msg = cPickle.dumps(range(2 ** size))
    print 'serialized length:', len(msg)
    bvar = sc.broadcast(msg)
    print 'length recovered from broadcast variable:', len(bvar.value)
    print 'correct value recovered:', msg == bvar.value
    bvar.unpersist()    

def check_unserialized(size):
    msg = range(2 ** size)
    bvar = sc.broadcast(msg)
    print 'correct value recovered:', msg == bvar.value
    bvar.unpersist()

SparkContext.setSystemProperty('spark.executor.memory', '15g')
SparkContext.setSystemProperty('spark.cores.max', '5')
sc = SparkContext('spark://crosby.research.intel-research.net:7077', 
'broadcast_bug')}}

**BLOCK 2**  [no problem]
check_pre_serialized(20)
> serialized length: 9374656
> length recovered from broadcast variable: 9374656
> correct value recovered: True

**BLOCK 3**  [no problem]
check_unserialized(20)
> correct value recovered: True

**BLOCK 4**  [no problem]
check_pre_serialized(27)
> serialized length: 1499501632
> length recovered from broadcast variable: 1499501632
> correct value recovered: True

**BLOCK 5**  [no problem]
check_unserialized(27)
> correct value recovered: True

**BLOCK 6**  **[ERROR 1: unhandled error from cPickle.dumps inside 
sc.broadcast]**
check_pre_serialized(28)
.....
> /home/spark/greatest/python/pyspark/serializers.py in dumps(self, obj)
>     354
>     355     def dumps(self, obj):
> --> 356         return cPickle.dumps(obj, 2)
>     357
>     358     loads = cPickle.loads
>
> SystemError: error return without exception set

**BLOCK 7**  [no problem]
check_unserialized(28)
> correct value recovered: True

**BLOCK 8**  **[ERROR 2: no error occurs and *incorrect result* is returned]**
check_pre_serialized(29)
> serialized length: 6331339840
> length recovered from broadcast variable: 2036372544
> correct value recovered: False

**BLOCK 9**  **[ERROR 3: unhandled error from zlib.compress inside 
sc.broadcast]**
check_unserialized(29)
......
> /home/spark/greatest/python/pyspark/serializers.py in dumps(self, obj)
>     418 
>     419     def dumps(self, obj):
> --> 420         return zlib.compress(self.serializer.dumps(obj), 1)
>     421 
>     422     def loads(self, obj):
> 
> OverflowError: size does not fit in an int

**BLOCK 10**  [ERROR 1]
check_pre_serialized(30)
...same as above...

**BLOCK 11**  [ERROR 3]
check_unserialized(30)
...same as above...


> Broadcast Variables above 2GB break in PySpark
> ----------------------------------------------
>
>                 Key: SPARK-3721
>                 URL: https://issues.apache.org/jira/browse/SPARK-3721
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 1.1.0
>            Reporter: Brad Miller
>
> Attempting to reproduce the bug in isolation in iPython notebook I've 
> observed the following 3 unique failure modes, all of which seem to be 
> related to broadcast variable size. Note that I'm running python 2.7.3 on all 
> machines and using the Spark 1.1.0 binaries.
> **BLOCK 1** [no problem]
> {{import cPickle}}
> from pyspark import SparkContext
> def check_pre_serialized(size):
>     msg = cPickle.dumps(range(2 ** size))
>     print 'serialized length:', len(msg)
>     bvar = sc.broadcast(msg)
>     print 'length recovered from broadcast variable:', len(bvar.value)
>     print 'correct value recovered:', msg == bvar.value
>     bvar.unpersist()    
> def check_unserialized(size):
>     msg = range(2 ** size)
>     bvar = sc.broadcast(msg)
>     print 'correct value recovered:', msg == bvar.value
>     bvar.unpersist()
> SparkContext.setSystemProperty('spark.executor.memory', '15g')
> SparkContext.setSystemProperty('spark.cores.max', '5')
> sc = SparkContext('spark://crosby.research.intel-research.net:7077', 
> 'broadcast_bug')}}
> **BLOCK 2**  [no problem]
> check_pre_serialized(20)
> > serialized length: 9374656
> > length recovered from broadcast variable: 9374656
> > correct value recovered: True
> **BLOCK 3**  [no problem]
> check_unserialized(20)
> > correct value recovered: True
> **BLOCK 4**  [no problem]
> check_pre_serialized(27)
> > serialized length: 1499501632
> > length recovered from broadcast variable: 1499501632
> > correct value recovered: True
> **BLOCK 5**  [no problem]
> check_unserialized(27)
> > correct value recovered: True
> **BLOCK 6**  **[ERROR 1: unhandled error from cPickle.dumps inside 
> sc.broadcast]**
> check_pre_serialized(28)
> .....
> > /home/spark/greatest/python/pyspark/serializers.py in dumps(self, obj)
> >     354
> >     355     def dumps(self, obj):
> > --> 356         return cPickle.dumps(obj, 2)
> >     357
> >     358     loads = cPickle.loads
> >
> > SystemError: error return without exception set
> **BLOCK 7**  [no problem]
> check_unserialized(28)
> > correct value recovered: True
> **BLOCK 8**  **[ERROR 2: no error occurs and *incorrect result* is returned]**
> check_pre_serialized(29)
> > serialized length: 6331339840
> > length recovered from broadcast variable: 2036372544
> > correct value recovered: False
> **BLOCK 9**  **[ERROR 3: unhandled error from zlib.compress inside 
> sc.broadcast]**
> check_unserialized(29)
> ......
> > /home/spark/greatest/python/pyspark/serializers.py in dumps(self, obj)
> >     418 
> >     419     def dumps(self, obj):
> > --> 420         return zlib.compress(self.serializer.dumps(obj), 1)
> >     421 
> >     422     def loads(self, obj):
> > 
> > OverflowError: size does not fit in an int
> **BLOCK 10**  [ERROR 1]
> check_pre_serialized(30)
> ...same as above...
> **BLOCK 11**  [ERROR 3]
> check_unserialized(30)
> ...same as above...



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to