Brad Miller created SPARK-3721: ---------------------------------- Summary: Broadcast Variables above 2GB break in PySpark Key: SPARK-3721 URL: https://issues.apache.org/jira/browse/SPARK-3721 Project: Spark Issue Type: Bug Components: PySpark Affects Versions: 1.1.0 Reporter: Brad Miller
Attempting to reproduce the bug in isolation in iPython notebook I've observed the following 3 unique failure modes, all of which seem to be related to broadcast variable size. Note that I'm running python 2.7.3 on all machines and using the Spark 1.1.0 binaries. **BLOCK 1** [no problem] import cPickle from pyspark import SparkContext def check_pre_serialized(size): msg = cPickle.dumps(range(2 ** size)) print 'serialized length:', len(msg) bvar = sc.broadcast(msg) print 'length recovered from broadcast variable:', len(bvar.value) print 'correct value recovered:', msg == bvar.value bvar.unpersist() def check_unserialized(size): msg = range(2 ** size) bvar = sc.broadcast(msg) print 'correct value recovered:', msg == bvar.value bvar.unpersist() SparkContext.setSystemProperty('spark.executor.memory', '15g') SparkContext.setSystemProperty('spark.cores.max', '5') sc = SparkContext('spark://crosby.research.intel-research.net:7077', 'broadcast_bug') **BLOCK 2** [no problem] check_pre_serialized(20) > serialized length: 9374656 > length recovered from broadcast variable: 9374656 > correct value recovered: True **BLOCK 3** [no problem] check_unserialized(20) > correct value recovered: True **BLOCK 4** [no problem] check_pre_serialized(27) > serialized length: 1499501632 > length recovered from broadcast variable: 1499501632 > correct value recovered: True **BLOCK 5** [no problem] check_unserialized(27) > correct value recovered: True **BLOCK 6** [ERROR 1: unhandled error from cPickle.dumps inside sc.broadcast] check_pre_serialized(28) ..... > /home/spark/greatest/python/pyspark/serializers.py in dumps(self, obj) > 354 > 355 def dumps(self, obj): > --> 356 return cPickle.dumps(obj, 2) > 357 > 358 loads = cPickle.loads > > SystemError: error return without exception set **BLOCK 7** [no problem] check_unserialized(28) > correct value recovered: True **BLOCK 8** [ERROR 2: no error occurs and *incorrect result* is returned] check_pre_serialized(29) > serialized length: 6331339840 > length recovered from broadcast variable: 2036372544 > correct value recovered: False **BLOCK 9** [ERROR 3: unhandled error from zlib.compress inside sc.broadcast] check_unserialized(29) ...... > /home/spark/greatest/python/pyspark/serializers.py in dumps(self, obj) > 418 > 419 def dumps(self, obj): > --> 420 return zlib.compress(self.serializer.dumps(obj), 1) > 421 > 422 def loads(self, obj): > > OverflowError: size does not fit in an int **BLOCK 10** [ERROR 1] check_pre_serialized(30) ...same as above... **BLOCK 11** [ERROR 3] check_unserialized(30) ...same as above... -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org