[ 
https://issues.apache.org/jira/browse/SPARK-3721?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Brad Miller updated SPARK-3721:
-------------------------------
    Description: 
The bug displays 3 unique failure modes in PySpark, all of which seem to be 
related to broadcast variable size. Note that the tests below ran python 2.7.3 
on all machines and used the Spark 1.1.0 binaries.

**BLOCK 1** [no problem]
{noformat}
import cPickle
from pyspark import SparkContext

def check_pre_serialized(size):
    msg = cPickle.dumps(range(2 ** size))
    print 'serialized length:', len(msg)
    bvar = sc.broadcast(msg)
    print 'length recovered from broadcast variable:', len(bvar.value)
    print 'correct value recovered:', msg == bvar.value
    bvar.unpersist()    

def check_unserialized(size):
    msg = range(2 ** size)
    bvar = sc.broadcast(msg)
    print 'correct value recovered:', msg == bvar.value
    bvar.unpersist()

SparkContext.setSystemProperty('spark.executor.memory', '15g')
SparkContext.setSystemProperty('spark.cores.max', '5')
sc = SparkContext('spark://crosby.research.intel-research.net:7077', 
'broadcast_bug')
{noformat}

**BLOCK 2**  [no problem]
{noformat}
check_pre_serialized(20)
> serialized length: 9374656
> length recovered from broadcast variable: 9374656
> correct value recovered: True
{noformat}

**BLOCK 3**  [no problem]
{noformat}
check_unserialized(20)
> correct value recovered: True
{noformat}

**BLOCK 4**  [no problem]
{noformat}
check_pre_serialized(27)
> serialized length: 1499501632
> length recovered from broadcast variable: 1499501632
> correct value recovered: True
{noformat}

**BLOCK 5**  [no problem]
{noformat}
check_unserialized(27)
> correct value recovered: True
{noformat}

**BLOCK 6**  **[ERROR 1: unhandled error from cPickle.dumps inside 
sc.broadcast]**
{noformat}
check_pre_serialized(28)
.....
> /home/spark/greatest/python/pyspark/serializers.py in dumps(self, obj)
>     354
>     355     def dumps(self, obj):
> --> 356         return cPickle.dumps(obj, 2)
>     357
>     358     loads = cPickle.loads
>
> SystemError: error return without exception set
{noformat}

**BLOCK 7**  [no problem]
{noformat}
check_unserialized(28)
> correct value recovered: True
{noformat}

**BLOCK 8**  **[ERROR 2: no error occurs and *incorrect result* is returned]**
{noformat}
check_pre_serialized(29)
> serialized length: 6331339840
> length recovered from broadcast variable: 2036372544
> correct value recovered: False
{noformat}

**BLOCK 9**  **[ERROR 3: unhandled error from zlib.compress inside 
sc.broadcast]**
{noformat}
check_unserialized(29)
......
> /home/spark/greatest/python/pyspark/serializers.py in dumps(self, obj)
>     418 
>     419     def dumps(self, obj):
> --> 420         return zlib.compress(self.serializer.dumps(obj), 1)
>     421 
>     422     def loads(self, obj):
> 
> OverflowError: size does not fit in an int
{noformat}

**BLOCK 10**  [ERROR 1]
{noformat}
check_pre_serialized(30)
...same as above...
{noformat}

**BLOCK 11**  [ERROR 3]
{noformat}
check_unserialized(30)
...same as above...
{noformat}


  was:
Attempting to reproduce the bug in isolation in iPython notebook I've observed 
the following 3 unique failure modes, all of which seem to be related to 
broadcast variable size. Note that I'm running python 2.7.3 on all machines and 
using the Spark 1.1.0 binaries.

**BLOCK 1** [no problem]
{noformat}
import cPickle
from pyspark import SparkContext

def check_pre_serialized(size):
    msg = cPickle.dumps(range(2 ** size))
    print 'serialized length:', len(msg)
    bvar = sc.broadcast(msg)
    print 'length recovered from broadcast variable:', len(bvar.value)
    print 'correct value recovered:', msg == bvar.value
    bvar.unpersist()    

def check_unserialized(size):
    msg = range(2 ** size)
    bvar = sc.broadcast(msg)
    print 'correct value recovered:', msg == bvar.value
    bvar.unpersist()

SparkContext.setSystemProperty('spark.executor.memory', '15g')
SparkContext.setSystemProperty('spark.cores.max', '5')
sc = SparkContext('spark://crosby.research.intel-research.net:7077', 
'broadcast_bug')
{noformat}

**BLOCK 2**  [no problem]
{noformat}
check_pre_serialized(20)
> serialized length: 9374656
> length recovered from broadcast variable: 9374656
> correct value recovered: True
{noformat}

**BLOCK 3**  [no problem]
{noformat}
check_unserialized(20)
> correct value recovered: True
{noformat}

**BLOCK 4**  [no problem]
{noformat}
check_pre_serialized(27)
> serialized length: 1499501632
> length recovered from broadcast variable: 1499501632
> correct value recovered: True
{noformat}

**BLOCK 5**  [no problem]
{noformat}
check_unserialized(27)
> correct value recovered: True
{noformat}

**BLOCK 6**  **[ERROR 1: unhandled error from cPickle.dumps inside 
sc.broadcast]**
{noformat}
check_pre_serialized(28)
.....
> /home/spark/greatest/python/pyspark/serializers.py in dumps(self, obj)
>     354
>     355     def dumps(self, obj):
> --> 356         return cPickle.dumps(obj, 2)
>     357
>     358     loads = cPickle.loads
>
> SystemError: error return without exception set
{noformat}

**BLOCK 7**  [no problem]
{noformat}
check_unserialized(28)
> correct value recovered: True
{noformat}

**BLOCK 8**  **[ERROR 2: no error occurs and *incorrect result* is returned]**
{noformat}
check_pre_serialized(29)
> serialized length: 6331339840
> length recovered from broadcast variable: 2036372544
> correct value recovered: False
{noformat}

**BLOCK 9**  **[ERROR 3: unhandled error from zlib.compress inside 
sc.broadcast]**
{noformat}
check_unserialized(29)
......
> /home/spark/greatest/python/pyspark/serializers.py in dumps(self, obj)
>     418 
>     419     def dumps(self, obj):
> --> 420         return zlib.compress(self.serializer.dumps(obj), 1)
>     421 
>     422     def loads(self, obj):
> 
> OverflowError: size does not fit in an int
{noformat}

**BLOCK 10**  [ERROR 1]
{noformat}
check_pre_serialized(30)
...same as above...
{noformat}

**BLOCK 11**  [ERROR 3]
{noformat}
check_unserialized(30)
...same as above...
{noformat}



> Broadcast Variables above 2GB break in PySpark
> ----------------------------------------------
>
>                 Key: SPARK-3721
>                 URL: https://issues.apache.org/jira/browse/SPARK-3721
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 1.1.0
>            Reporter: Brad Miller
>
> The bug displays 3 unique failure modes in PySpark, all of which seem to be 
> related to broadcast variable size. Note that the tests below ran python 
> 2.7.3 on all machines and used the Spark 1.1.0 binaries.
> **BLOCK 1** [no problem]
> {noformat}
> import cPickle
> from pyspark import SparkContext
> def check_pre_serialized(size):
>     msg = cPickle.dumps(range(2 ** size))
>     print 'serialized length:', len(msg)
>     bvar = sc.broadcast(msg)
>     print 'length recovered from broadcast variable:', len(bvar.value)
>     print 'correct value recovered:', msg == bvar.value
>     bvar.unpersist()    
> def check_unserialized(size):
>     msg = range(2 ** size)
>     bvar = sc.broadcast(msg)
>     print 'correct value recovered:', msg == bvar.value
>     bvar.unpersist()
> SparkContext.setSystemProperty('spark.executor.memory', '15g')
> SparkContext.setSystemProperty('spark.cores.max', '5')
> sc = SparkContext('spark://crosby.research.intel-research.net:7077', 
> 'broadcast_bug')
> {noformat}
> **BLOCK 2**  [no problem]
> {noformat}
> check_pre_serialized(20)
> > serialized length: 9374656
> > length recovered from broadcast variable: 9374656
> > correct value recovered: True
> {noformat}
> **BLOCK 3**  [no problem]
> {noformat}
> check_unserialized(20)
> > correct value recovered: True
> {noformat}
> **BLOCK 4**  [no problem]
> {noformat}
> check_pre_serialized(27)
> > serialized length: 1499501632
> > length recovered from broadcast variable: 1499501632
> > correct value recovered: True
> {noformat}
> **BLOCK 5**  [no problem]
> {noformat}
> check_unserialized(27)
> > correct value recovered: True
> {noformat}
> **BLOCK 6**  **[ERROR 1: unhandled error from cPickle.dumps inside 
> sc.broadcast]**
> {noformat}
> check_pre_serialized(28)
> .....
> > /home/spark/greatest/python/pyspark/serializers.py in dumps(self, obj)
> >     354
> >     355     def dumps(self, obj):
> > --> 356         return cPickle.dumps(obj, 2)
> >     357
> >     358     loads = cPickle.loads
> >
> > SystemError: error return without exception set
> {noformat}
> **BLOCK 7**  [no problem]
> {noformat}
> check_unserialized(28)
> > correct value recovered: True
> {noformat}
> **BLOCK 8**  **[ERROR 2: no error occurs and *incorrect result* is returned]**
> {noformat}
> check_pre_serialized(29)
> > serialized length: 6331339840
> > length recovered from broadcast variable: 2036372544
> > correct value recovered: False
> {noformat}
> **BLOCK 9**  **[ERROR 3: unhandled error from zlib.compress inside 
> sc.broadcast]**
> {noformat}
> check_unserialized(29)
> ......
> > /home/spark/greatest/python/pyspark/serializers.py in dumps(self, obj)
> >     418 
> >     419     def dumps(self, obj):
> > --> 420         return zlib.compress(self.serializer.dumps(obj), 1)
> >     421 
> >     422     def loads(self, obj):
> > 
> > OverflowError: size does not fit in an int
> {noformat}
> **BLOCK 10**  [ERROR 1]
> {noformat}
> check_pre_serialized(30)
> ...same as above...
> {noformat}
> **BLOCK 11**  [ERROR 3]
> {noformat}
> check_unserialized(30)
> ...same as above...
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to