Parth Gandhi created SPARK-26658: ------------------------------------ Summary: Pyspark job is unable to serialize large objects Key: SPARK-26658 URL: https://issues.apache.org/jira/browse/SPARK-26658 Project: Spark Issue Type: Bug Components: PySpark Affects Versions: 3.0.0 Reporter: Parth Gandhi
When a pyspark job using python 3 tries to serialize large objects, it throws a pickle error in case of trying to serialize global variable object and overflow error in case of broadcast. global object: {code:java} Traceback (most recent call last): File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", line 235, in dump return Pickler.dump(self, obj) File "/home/var/python36/lib/python3.6/pickle.py", line 409, in dump self.save(obj) File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save f(self, obj) # Call unbound method with explicit self File "/home/var/python36/lib/python3.6/pickle.py", line 751, in save_tuple save(element) File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save f(self, obj) # Call unbound method with explicit self File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", line 378, in save_function self.save_function_tuple(obj) File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", line 529, in save_function_tuple save(closure_values) File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save f(self, obj) # Call unbound method with explicit self File "/home/var/python36/lib/python3.6/pickle.py", line 781, in save_list self._batch_appends(obj) File "/home/var/python36/lib/python3.6/pickle.py", line 805, in _batch_appends save(x) File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save f(self, obj) # Call unbound method with explicit self File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", line 378, in save_function self.save_function_tuple(obj) File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", line 529, in save_function_tuple save(closure_values) File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save f(self, obj) # Call unbound method with explicit self File "/home/var/python36/lib/python3.6/pickle.py", line 781, in save_list self._batch_appends(obj) File "/home/var/python36/lib/python3.6/pickle.py", line 805, in _batch_appends save(x) File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save f(self, obj) # Call unbound method with explicit self File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", line 378, in save_function self.save_function_tuple(obj) File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", line 529, in save_function_tuple save(closure_values) File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save f(self, obj) # Call unbound method with explicit self File "/home/var/python36/lib/python3.6/pickle.py", line 781, in save_list self._batch_appends(obj) File "/home/var/python36/lib/python3.6/pickle.py", line 805, in _batch_appends save(x) File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save f(self, obj) # Call unbound method with explicit self File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", line 378, in save_function self.save_function_tuple(obj) File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", line 529, in save_function_tuple save(closure_values) File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save f(self, obj) # Call unbound method with explicit self File "/home/var/python36/lib/python3.6/pickle.py", line 781, in save_list self._batch_appends(obj) File "/home/var/python36/lib/python3.6/pickle.py", line 808, in _batch_appends save(tmp[0]) File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save f(self, obj) # Call unbound method with explicit self File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", line 372, in save_function self.save_function_tuple(obj) File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", line 525, in save_function_tuple save(f_globals) File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save f(self, obj) # Call unbound method with explicit self File "/home/var/python36/lib/python3.6/pickle.py", line 821, in save_dict self._batch_setitems(obj.items()) File "/home/var/python36/lib/python3.6/pickle.py", line 847, in _batch_setitems save(v) File "/home/var/python36/lib/python3.6/pickle.py", line 521, in save self.save_reduce(obj=obj, *rv) File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", line 804, in save_reduce save(state) File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save f(self, obj) # Call unbound method with explicit self File "/home/var/python36/lib/python3.6/pickle.py", line 821, in save_dict self._batch_setitems(obj.items()) File "/home/var/python36/lib/python3.6/pickle.py", line 847, in _batch_setitems save(v) File "/home/var/python36/lib/python3.6/pickle.py", line 521, in save self.save_reduce(obj=obj, *rv) File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", line 804, in save_reduce save(state) File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save f(self, obj) # Call unbound method with explicit self File "/home/var/python36/lib/python3.6/pickle.py", line 751, in save_tuple save(element) File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save f(self, obj) # Call unbound method with explicit self File "/home/var/python36/lib/python3.6/pickle.py", line 693, in save_bytes (str(obj, 'latin1'), 'latin1'), obj=obj) File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", line 786, in save_reduce save(args) File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save f(self, obj) # Call unbound method with explicit self File "/home/var/python36/lib/python3.6/pickle.py", line 736, in save_tuple save(element) File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save f(self, obj) # Call unbound method with explicit self File "/home/var/python36/lib/python3.6/pickle.py", line 714, in save_str self.write(BINUNICODE + pack("<I", n) + encoded) struct.error: 'I' format requires 0 <= number <= 4294967295 {code} broadcast: {code:java} Traceback (most recent call last): File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/broadcast.py", line 113, in dump pickle.dump(value, f, 2) OverflowError: cannot serialize a string larger than 4GiB Traceback (most recent call last): File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/broadcast.py", line 113, in dump pickle.dump(value, f, 2) OverflowError: cannot serialize a string larger than 4GiB {code} Steps to Reproduce: - To reproduce the above issue, I am using the word2vec model trained on the Google News dataset downloaded from [https://drive.google.com/file/d/0B7XkCwpI5KDYNlNUTTlSS21pQmM/edit?usp=sharing] - Use python 3.x with module gensim installed(or ship the module zip file using --py-files). Launch pyspark with the following command: {code:java} bin/pyspark --master yarn --py-files additionalPythonModules.zip --conf spark.driver.memory=16g --conf spark.executor.memory=16g --conf spark.driver.memoryOverhead=16g --conf spark.executor.memoryOverhd=16g --conf spark.executor.pyspark.memory=16g{code} For the sake of reproducing the issue, I have simply pasted certain parts of the code here: {code:java} Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 19/01/17 21:16:46 WARN DomainSocketFactory: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded. 19/01/17 21:16:48 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME. Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /__ / .__/\_,_/_/ /_/\_\ version 2.3.3-SNAPSHOT /_/ Using Python version 3.6.3 (default, Jun 19 2018 22:39:11) SparkSession available as 'spark'. >>> import gensim /home/y/var/python36/lib/python3.6/site-packages/requests/__init__.py:91: RequestsDependencyWarning: urllib3 (1.24.1) or chardet (3.0.4) doesn't match a supported version! RequestsDependencyWarning) >>> score_threshold = 0.65 >>> synonym_limit = 3 >>> model = >>> gensim.models.KeyedVectors.load_word2vec_format('hdfs://home/pgandhi/GoogleNews-vectors-negative300.bin', >>> binary=True) >>> def isPhrase(word): ... if word.find('_') != -1 : ... return 1 ... return 0 ... >>> def process_word(line): ... word = "test" ... positiveWords = [] ... positiveWords.append(word) ... try : ... results = model.most_similar(positive=positiveWords) ... synonym_vec = [] ... for i in range(len(results)) : ... result = results[i] ... if (result[1] > score_threshold ) : ... synonym = result[0] ... synonym = synonym.lower() ... if (isPhrase(synonym)==0) and (word != synonym) : ... synonym_vec.append(synonym) ... if len(synonym_vec) > synonym_limit : ... break ... if len(synonym_vec) > 0 : ... #print(word +"\t"+ ",".join(synonym_vec)) ... return (word, ",".join(synonym_vec)) ... except KeyError : ... sys.stderr.write("key error: " + word + "\n") ... >>> if __name__ == "__main__": ... rdd = sc.parallelize(["test1", "test2", "test3"]) ... rdd2 = rdd.map(process_word) ... rdd2.count() ... {code} * For reproducing the issue with broadcast, simply run the code below in pyspark shell: {code:java} Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 19/01/17 19:31:10 WARN DomainSocketFactory: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded. 19/01/17 19:31:12 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME. Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /__ / .__/\_,_/_/ /_/\_\ version 2.3.3-SNAPSHOT /_/ Using Python version 3.6.3 (default, Jun 19 2018 22:39:11) SparkSession available as 'spark'. >>> import gensim /home/y/var/python36/lib/python3.6/site-packages/requests/__init__.py:91: RequestsDependencyWarning: urllib3 (1.24.1) or chardet (3.0.4) doesn't match a supported version! RequestsDependencyWarning) >>> model = >>> sc.broadcast(gensim.models.KeyedVectors.load_word2vec_format('hdfs://user/pgandhi/GoogleNews-vectors-negative300.bin', >>> binary=True)) {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org