[ https://issues.apache.org/jira/browse/SPARK-26658?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Parth Gandhi updated SPARK-26658: --------------------------------- Description: When a pyspark job using python 3 tries to serialize large objects, it throws a pickle error in case of trying to serialize global variable object and overflow error in case of broadcast. global object: {code:java} Traceback (most recent call last): File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", line 235, in dump return Pickler.dump(self, obj) File "/home/var/python36/lib/python3.6/pickle.py", line 409, in dump self.save(obj) File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save f(self, obj) # Call unbound method with explicit self File "/home/var/python36/lib/python3.6/pickle.py", line 751, in save_tuple save(element) File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save f(self, obj) # Call unbound method with explicit self File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", line 378, in save_function self.save_function_tuple(obj) File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", line 529, in save_function_tuple save(closure_values) File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save f(self, obj) # Call unbound method with explicit self File "/home/var/python36/lib/python3.6/pickle.py", line 781, in save_list self._batch_appends(obj) File "/home/var/python36/lib/python3.6/pickle.py", line 805, in _batch_appends save(x) File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save f(self, obj) # Call unbound method with explicit self File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", line 378, in save_function self.save_function_tuple(obj) File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", line 529, in save_function_tuple save(closure_values) File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save f(self, obj) # Call unbound method with explicit self File "/home/var/python36/lib/python3.6/pickle.py", line 781, in save_list self._batch_appends(obj) File "/home/var/python36/lib/python3.6/pickle.py", line 805, in _batch_appends save(x) File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save f(self, obj) # Call unbound method with explicit self File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", line 378, in save_function self.save_function_tuple(obj) File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", line 529, in save_function_tuple save(closure_values) File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save f(self, obj) # Call unbound method with explicit self File "/home/var/python36/lib/python3.6/pickle.py", line 781, in save_list self._batch_appends(obj) File "/home/var/python36/lib/python3.6/pickle.py", line 805, in _batch_appends save(x) File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save f(self, obj) # Call unbound method with explicit self File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", line 378, in save_function self.save_function_tuple(obj) File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", line 529, in save_function_tuple save(closure_values) File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save f(self, obj) # Call unbound method with explicit self File "/home/var/python36/lib/python3.6/pickle.py", line 781, in save_list self._batch_appends(obj) File "/home/var/python36/lib/python3.6/pickle.py", line 808, in _batch_appends save(tmp[0]) File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save f(self, obj) # Call unbound method with explicit self File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", line 372, in save_function self.save_function_tuple(obj) File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", line 525, in save_function_tuple save(f_globals) File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save f(self, obj) # Call unbound method with explicit self File "/home/var/python36/lib/python3.6/pickle.py", line 821, in save_dict self._batch_setitems(obj.items()) File "/home/var/python36/lib/python3.6/pickle.py", line 847, in _batch_setitems save(v) File "/home/var/python36/lib/python3.6/pickle.py", line 521, in save self.save_reduce(obj=obj, *rv) File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", line 804, in save_reduce save(state) File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save f(self, obj) # Call unbound method with explicit self File "/home/var/python36/lib/python3.6/pickle.py", line 821, in save_dict self._batch_setitems(obj.items()) File "/home/var/python36/lib/python3.6/pickle.py", line 847, in _batch_setitems save(v) File "/home/var/python36/lib/python3.6/pickle.py", line 521, in save self.save_reduce(obj=obj, *rv) File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", line 804, in save_reduce save(state) File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save f(self, obj) # Call unbound method with explicit self File "/home/var/python36/lib/python3.6/pickle.py", line 751, in save_tuple save(element) File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save f(self, obj) # Call unbound method with explicit self File "/home/var/python36/lib/python3.6/pickle.py", line 693, in save_bytes (str(obj, 'latin1'), 'latin1'), obj=obj) File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", line 786, in save_reduce save(args) File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save f(self, obj) # Call unbound method with explicit self File "/home/var/python36/lib/python3.6/pickle.py", line 736, in save_tuple save(element) File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save f(self, obj) # Call unbound method with explicit self File "/home/var/python36/lib/python3.6/pickle.py", line 714, in save_str self.write(BINUNICODE + pack("<I", n) + encoded) struct.error: 'I' format requires 0 <= number <= 4294967295 {code} broadcast: {code:java} Traceback (most recent call last): File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/broadcast.py", line 113, in dump pickle.dump(value, f, 2) OverflowError: cannot serialize a string larger than 4GiB Traceback (most recent call last): File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/broadcast.py", line 113, in dump pickle.dump(value, f, 2) OverflowError: cannot serialize a string larger than 4GiB {code} Steps to Reproduce: - To reproduce the above issue, I am using the word2vec model trained on the Google News dataset downloaded from [https://drive.google.com/file/d/0B7XkCwpI5KDYNlNUTTlSS21pQmM/edit?usp=sharing] - Use python 3.x with module gensim installed(or ship the module zip file using --py-files). Launch pyspark with the following command: {code:java} bin/pyspark --master yarn --py-files additionalPythonModules.zip --conf spark.driver.memory=16g --conf spark.executor.memory=16g --conf spark.driver.memoryOverhead=16g --conf spark.executor.memoryOverhead=16g --conf spark.executor.pyspark.memory=16g{code} For the sake of reproducing the issue, I have simply pasted certain parts of the code here: {code:java} Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 19/01/17 21:16:46 WARN DomainSocketFactory: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded. 19/01/17 21:16:48 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME. Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /__ / .__/\_,_/_/ /_/\_\ version 2.3.3-SNAPSHOT /_/ Using Python version 3.6.3 (default, Jun 19 2018 22:39:11) SparkSession available as 'spark'. >>> import gensim /home/var/python36/lib/python3.6/site-packages/requests/__init__.py:91: RequestsDependencyWarning: urllib3 (1.24.1) or chardet (3.0.4) doesn't match a supported version! RequestsDependencyWarning) >>> score_threshold = 0.65 >>> synonym_limit = 3 >>> model = >>> gensim.models.KeyedVectors.load_word2vec_format('hdfs://home/pgandhi/GoogleNews-vectors-negative300.bin', >>> binary=True) >>> def isPhrase(word): ... if word.find('_') != -1 : ... return 1 ... return 0 ... >>> def process_word(line): ... word = "test" ... positiveWords = [] ... positiveWords.append(word) ... try : ... results = model.most_similar(positive=positiveWords) ... synonym_vec = [] ... for i in range(len(results)) : ... result = results[i] ... if (result[1] > score_threshold ) : ... synonym = result[0] ... synonym = synonym.lower() ... if (isPhrase(synonym)==0) and (word != synonym) : ... synonym_vec.append(synonym) ... if len(synonym_vec) > synonym_limit : ... break ... if len(synonym_vec) > 0 : ... #print(word +"\t"+ ",".join(synonym_vec)) ... return (word, ",".join(synonym_vec)) ... except KeyError : ... sys.stderr.write("key error: " + word + "\n") ... >>> if __name__ == "__main__": ... rdd = sc.parallelize(["test1", "test2", "test3"]) ... rdd2 = rdd.map(process_word) ... rdd2.count() ... {code} * For reproducing the issue with broadcast, simply run the code below in pyspark shell: {code:java} Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 19/01/17 19:31:10 WARN DomainSocketFactory: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded. 19/01/17 19:31:12 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME. Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /__ / .__/\_,_/_/ /_/\_\ version 2.3.3-SNAPSHOT /_/ Using Python version 3.6.3 (default, Jun 19 2018 22:39:11) SparkSession available as 'spark'. >>> import gensim /home/var/python36/lib/python3.6/site-packages/requests/__init__.py:91: RequestsDependencyWarning: urllib3 (1.24.1) or chardet (3.0.4) doesn't match a supported version! RequestsDependencyWarning) >>> model = >>> sc.broadcast(gensim.models.KeyedVectors.load_word2vec_format('hdfs://home/pgandhi/GoogleNews-vectors-negative300.bin', >>> binary=True)) {code} was: When a pyspark job using python 3 tries to serialize large objects, it throws a pickle error in case of trying to serialize global variable object and overflow error in case of broadcast. global object: {code:java} Traceback (most recent call last): File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", line 235, in dump return Pickler.dump(self, obj) File "/home/var/python36/lib/python3.6/pickle.py", line 409, in dump self.save(obj) File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save f(self, obj) # Call unbound method with explicit self File "/home/var/python36/lib/python3.6/pickle.py", line 751, in save_tuple save(element) File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save f(self, obj) # Call unbound method with explicit self File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", line 378, in save_function self.save_function_tuple(obj) File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", line 529, in save_function_tuple save(closure_values) File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save f(self, obj) # Call unbound method with explicit self File "/home/var/python36/lib/python3.6/pickle.py", line 781, in save_list self._batch_appends(obj) File "/home/var/python36/lib/python3.6/pickle.py", line 805, in _batch_appends save(x) File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save f(self, obj) # Call unbound method with explicit self File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", line 378, in save_function self.save_function_tuple(obj) File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", line 529, in save_function_tuple save(closure_values) File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save f(self, obj) # Call unbound method with explicit self File "/home/var/python36/lib/python3.6/pickle.py", line 781, in save_list self._batch_appends(obj) File "/home/var/python36/lib/python3.6/pickle.py", line 805, in _batch_appends save(x) File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save f(self, obj) # Call unbound method with explicit self File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", line 378, in save_function self.save_function_tuple(obj) File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", line 529, in save_function_tuple save(closure_values) File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save f(self, obj) # Call unbound method with explicit self File "/home/var/python36/lib/python3.6/pickle.py", line 781, in save_list self._batch_appends(obj) File "/home/var/python36/lib/python3.6/pickle.py", line 805, in _batch_appends save(x) File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save f(self, obj) # Call unbound method with explicit self File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", line 378, in save_function self.save_function_tuple(obj) File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", line 529, in save_function_tuple save(closure_values) File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save f(self, obj) # Call unbound method with explicit self File "/home/var/python36/lib/python3.6/pickle.py", line 781, in save_list self._batch_appends(obj) File "/home/var/python36/lib/python3.6/pickle.py", line 808, in _batch_appends save(tmp[0]) File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save f(self, obj) # Call unbound method with explicit self File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", line 372, in save_function self.save_function_tuple(obj) File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", line 525, in save_function_tuple save(f_globals) File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save f(self, obj) # Call unbound method with explicit self File "/home/var/python36/lib/python3.6/pickle.py", line 821, in save_dict self._batch_setitems(obj.items()) File "/home/var/python36/lib/python3.6/pickle.py", line 847, in _batch_setitems save(v) File "/home/var/python36/lib/python3.6/pickle.py", line 521, in save self.save_reduce(obj=obj, *rv) File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", line 804, in save_reduce save(state) File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save f(self, obj) # Call unbound method with explicit self File "/home/var/python36/lib/python3.6/pickle.py", line 821, in save_dict self._batch_setitems(obj.items()) File "/home/var/python36/lib/python3.6/pickle.py", line 847, in _batch_setitems save(v) File "/home/var/python36/lib/python3.6/pickle.py", line 521, in save self.save_reduce(obj=obj, *rv) File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", line 804, in save_reduce save(state) File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save f(self, obj) # Call unbound method with explicit self File "/home/var/python36/lib/python3.6/pickle.py", line 751, in save_tuple save(element) File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save f(self, obj) # Call unbound method with explicit self File "/home/var/python36/lib/python3.6/pickle.py", line 693, in save_bytes (str(obj, 'latin1'), 'latin1'), obj=obj) File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", line 786, in save_reduce save(args) File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save f(self, obj) # Call unbound method with explicit self File "/home/var/python36/lib/python3.6/pickle.py", line 736, in save_tuple save(element) File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save f(self, obj) # Call unbound method with explicit self File "/home/var/python36/lib/python3.6/pickle.py", line 714, in save_str self.write(BINUNICODE + pack("<I", n) + encoded) struct.error: 'I' format requires 0 <= number <= 4294967295 {code} broadcast: {code:java} Traceback (most recent call last): File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/broadcast.py", line 113, in dump pickle.dump(value, f, 2) OverflowError: cannot serialize a string larger than 4GiB Traceback (most recent call last): File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/broadcast.py", line 113, in dump pickle.dump(value, f, 2) OverflowError: cannot serialize a string larger than 4GiB {code} Steps to Reproduce: - To reproduce the above issue, I am using the word2vec model trained on the Google News dataset downloaded from [https://drive.google.com/file/d/0B7XkCwpI5KDYNlNUTTlSS21pQmM/edit?usp=sharing] - Use python 3.x with module gensim installed(or ship the module zip file using --py-files). Launch pyspark with the following command: {code:java} bin/pyspark --master yarn --py-files additionalPythonModules.zip --conf spark.driver.memory=16g --conf spark.executor.memory=16g --conf spark.driver.memoryOverhead=16g --conf spark.executor.memoryOverhd=16g --conf spark.executor.pyspark.memory=16g{code} For the sake of reproducing the issue, I have simply pasted certain parts of the code here: {code:java} Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 19/01/17 21:16:46 WARN DomainSocketFactory: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded. 19/01/17 21:16:48 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME. Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /__ / .__/\_,_/_/ /_/\_\ version 2.3.3-SNAPSHOT /_/ Using Python version 3.6.3 (default, Jun 19 2018 22:39:11) SparkSession available as 'spark'. >>> import gensim /home/var/python36/lib/python3.6/site-packages/requests/__init__.py:91: RequestsDependencyWarning: urllib3 (1.24.1) or chardet (3.0.4) doesn't match a supported version! RequestsDependencyWarning) >>> score_threshold = 0.65 >>> synonym_limit = 3 >>> model = >>> gensim.models.KeyedVectors.load_word2vec_format('hdfs://home/pgandhi/GoogleNews-vectors-negative300.bin', >>> binary=True) >>> def isPhrase(word): ... if word.find('_') != -1 : ... return 1 ... return 0 ... >>> def process_word(line): ... word = "test" ... positiveWords = [] ... positiveWords.append(word) ... try : ... results = model.most_similar(positive=positiveWords) ... synonym_vec = [] ... for i in range(len(results)) : ... result = results[i] ... if (result[1] > score_threshold ) : ... synonym = result[0] ... synonym = synonym.lower() ... if (isPhrase(synonym)==0) and (word != synonym) : ... synonym_vec.append(synonym) ... if len(synonym_vec) > synonym_limit : ... break ... if len(synonym_vec) > 0 : ... #print(word +"\t"+ ",".join(synonym_vec)) ... return (word, ",".join(synonym_vec)) ... except KeyError : ... sys.stderr.write("key error: " + word + "\n") ... >>> if __name__ == "__main__": ... rdd = sc.parallelize(["test1", "test2", "test3"]) ... rdd2 = rdd.map(process_word) ... rdd2.count() ... {code} * For reproducing the issue with broadcast, simply run the code below in pyspark shell: {code:java} Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 19/01/17 19:31:10 WARN DomainSocketFactory: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded. 19/01/17 19:31:12 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME. Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /__ / .__/\_,_/_/ /_/\_\ version 2.3.3-SNAPSHOT /_/ Using Python version 3.6.3 (default, Jun 19 2018 22:39:11) SparkSession available as 'spark'. >>> import gensim /home/var/python36/lib/python3.6/site-packages/requests/__init__.py:91: RequestsDependencyWarning: urllib3 (1.24.1) or chardet (3.0.4) doesn't match a supported version! RequestsDependencyWarning) >>> model = >>> sc.broadcast(gensim.models.KeyedVectors.load_word2vec_format('hdfs://home/pgandhi/GoogleNews-vectors-negative300.bin', >>> binary=True)) {code} > Pyspark job is unable to serialize large objects > ------------------------------------------------- > > Key: SPARK-26658 > URL: https://issues.apache.org/jira/browse/SPARK-26658 > Project: Spark > Issue Type: Bug > Components: PySpark > Affects Versions: 3.0.0 > Reporter: Parth Gandhi > Priority: Minor > > When a pyspark job using python 3 tries to serialize large objects, it throws > a pickle error in case of trying to serialize global variable object and > overflow error in case of broadcast. > global object: > > {code:java} > Traceback (most recent call last): > File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", > line 235, in dump > return Pickler.dump(self, obj) > File "/home/var/python36/lib/python3.6/pickle.py", line 409, in dump > self.save(obj) > File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save > f(self, obj) # Call unbound method with explicit self > File "/home/var/python36/lib/python3.6/pickle.py", line 751, in save_tuple > save(element) > File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save > f(self, obj) # Call unbound method with explicit self > File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", > line 378, in save_function > self.save_function_tuple(obj) > File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", > line 529, in save_function_tuple > save(closure_values) > File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save > f(self, obj) # Call unbound method with explicit self > File "/home/var/python36/lib/python3.6/pickle.py", line 781, in save_list > self._batch_appends(obj) > File "/home/var/python36/lib/python3.6/pickle.py", line 805, in > _batch_appends > save(x) > File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save > f(self, obj) # Call unbound method with explicit self > File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", > line 378, in save_function > self.save_function_tuple(obj) > File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", > line 529, in save_function_tuple > save(closure_values) > File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save > f(self, obj) # Call unbound method with explicit self > File "/home/var/python36/lib/python3.6/pickle.py", line 781, in save_list > self._batch_appends(obj) > File "/home/var/python36/lib/python3.6/pickle.py", line 805, in > _batch_appends > save(x) > File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save > f(self, obj) # Call unbound method with explicit self > File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", > line 378, in save_function > self.save_function_tuple(obj) > File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", > line 529, in save_function_tuple > save(closure_values) > File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save > f(self, obj) # Call unbound method with explicit self > File "/home/var/python36/lib/python3.6/pickle.py", line 781, in save_list > self._batch_appends(obj) > File "/home/var/python36/lib/python3.6/pickle.py", line 805, in > _batch_appends > save(x) > File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save > f(self, obj) # Call unbound method with explicit self > File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", > line 378, in save_function > self.save_function_tuple(obj) > File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", > line 529, in save_function_tuple > save(closure_values) > File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save > f(self, obj) # Call unbound method with explicit self > File "/home/var/python36/lib/python3.6/pickle.py", line 781, in save_list > self._batch_appends(obj) > File "/home/var/python36/lib/python3.6/pickle.py", line 808, in > _batch_appends > save(tmp[0]) > File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save > f(self, obj) # Call unbound method with explicit self > File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", > line 372, in save_function > self.save_function_tuple(obj) > File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", > line 525, in save_function_tuple > save(f_globals) > File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save > f(self, obj) # Call unbound method with explicit self > File "/home/var/python36/lib/python3.6/pickle.py", line 821, in save_dict > self._batch_setitems(obj.items()) > File "/home/var/python36/lib/python3.6/pickle.py", line 847, in > _batch_setitems > save(v) > File "/home/var/python36/lib/python3.6/pickle.py", line 521, in save > self.save_reduce(obj=obj, *rv) > File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", > line 804, in save_reduce > save(state) > File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save > f(self, obj) # Call unbound method with explicit self > File "/home/var/python36/lib/python3.6/pickle.py", line 821, in save_dict > self._batch_setitems(obj.items()) > File "/home/var/python36/lib/python3.6/pickle.py", line 847, in > _batch_setitems > save(v) > File "/home/var/python36/lib/python3.6/pickle.py", line 521, in save > self.save_reduce(obj=obj, *rv) > File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", > line 804, in save_reduce > save(state) > File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save > f(self, obj) # Call unbound method with explicit self > File "/home/var/python36/lib/python3.6/pickle.py", line 751, in save_tuple > save(element) > File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save > f(self, obj) # Call unbound method with explicit self > File "/home/var/python36/lib/python3.6/pickle.py", line 693, in save_bytes > (str(obj, 'latin1'), 'latin1'), obj=obj) > File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", > line 786, in save_reduce > save(args) > File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save > f(self, obj) # Call unbound method with explicit self > File "/home/var/python36/lib/python3.6/pickle.py", line 736, in save_tuple > save(element) > File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save > f(self, obj) # Call unbound method with explicit self > File "/home/var/python36/lib/python3.6/pickle.py", line 714, in save_str > self.write(BINUNICODE + pack("<I", n) + encoded) > struct.error: 'I' format requires 0 <= number <= 4294967295 > {code} > > > > broadcast: > {code:java} > Traceback (most recent call last): > > File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/broadcast.py", > line 113, in dump > pickle.dump(value, f, 2) > OverflowError: cannot serialize a string larger than 4GiB > Traceback (most recent call last): > File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/broadcast.py", > line 113, in dump > pickle.dump(value, f, 2) > OverflowError: cannot serialize a string larger than 4GiB > {code} > > > Steps to Reproduce: > - To reproduce the above issue, I am using the word2vec model trained on the > Google News dataset downloaded from > [https://drive.google.com/file/d/0B7XkCwpI5KDYNlNUTTlSS21pQmM/edit?usp=sharing] > - Use python 3.x with module gensim installed(or ship the module zip file > using --py-files). > Launch pyspark with the following command: > {code:java} > bin/pyspark --master yarn --py-files additionalPythonModules.zip --conf > spark.driver.memory=16g --conf spark.executor.memory=16g --conf > spark.driver.memoryOverhead=16g --conf spark.executor.memoryOverhead=16g > --conf spark.executor.pyspark.memory=16g{code} > For the sake of reproducing the issue, I have simply pasted certain parts of > the code here: > > {code:java} > Setting default log level to "WARN". > To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use > setLogLevel(newLevel). > 19/01/17 21:16:46 WARN DomainSocketFactory: The short-circuit local reads > feature cannot be used because libhadoop cannot be loaded. > 19/01/17 21:16:48 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive > is set, falling back to uploading libraries under SPARK_HOME. > Welcome to > ____ __ > / __/__ ___ _____/ /__ > _\ \/ _ \/ _ `/ __/ '_/ > /__ / .__/\_,_/_/ /_/\_\ version 2.3.3-SNAPSHOT > /_/ > > Using Python version 3.6.3 (default, Jun 19 2018 22:39:11) > SparkSession available as 'spark'. > >>> import gensim > /home/var/python36/lib/python3.6/site-packages/requests/__init__.py:91: > RequestsDependencyWarning: urllib3 (1.24.1) or chardet (3.0.4) doesn't match > a supported version! > RequestsDependencyWarning) > >>> score_threshold = 0.65 > >>> synonym_limit = 3 > >>> model = > >>> gensim.models.KeyedVectors.load_word2vec_format('hdfs://home/pgandhi/GoogleNews-vectors-negative300.bin', > >>> binary=True) > >>> def isPhrase(word): > ... if word.find('_') != -1 : > ... return 1 > ... return 0 > ... > >>> def process_word(line): > ... word = "test" > ... positiveWords = [] > ... positiveWords.append(word) > ... try : > ... results = model.most_similar(positive=positiveWords) > ... synonym_vec = [] > ... for i in range(len(results)) : > ... result = results[i] > ... if (result[1] > score_threshold ) : > ... synonym = result[0] > ... synonym = synonym.lower() > ... if (isPhrase(synonym)==0) and (word != synonym) : > ... synonym_vec.append(synonym) > ... if len(synonym_vec) > synonym_limit : > ... break > ... if len(synonym_vec) > 0 : > ... #print(word +"\t"+ ",".join(synonym_vec)) > ... return (word, ",".join(synonym_vec)) > ... except KeyError : > ... sys.stderr.write("key error: " + word + "\n") > ... > >>> if __name__ == "__main__": > ... rdd = sc.parallelize(["test1", "test2", "test3"]) > ... rdd2 = rdd.map(process_word) > ... rdd2.count() > ... > {code} > > * For reproducing the issue with broadcast, simply run the code below in > pyspark shell: > > {code:java} > Setting default log level to "WARN". > To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use > setLogLevel(newLevel). > 19/01/17 19:31:10 WARN DomainSocketFactory: The short-circuit local reads > feature cannot be used because libhadoop cannot be loaded. > 19/01/17 19:31:12 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive > is set, falling back to uploading libraries under SPARK_HOME. > Welcome to > ____ __ > / __/__ ___ _____/ /__ > _\ \/ _ \/ _ `/ __/ '_/ > /__ / .__/\_,_/_/ /_/\_\ version 2.3.3-SNAPSHOT > /_/ > > Using Python version 3.6.3 (default, Jun 19 2018 22:39:11) > SparkSession available as 'spark'. > >>> import gensim > /home/var/python36/lib/python3.6/site-packages/requests/__init__.py:91: > RequestsDependencyWarning: urllib3 (1.24.1) or chardet (3.0.4) doesn't match > a supported version! > RequestsDependencyWarning) > >>> model = > >>> sc.broadcast(gensim.models.KeyedVectors.load_word2vec_format('hdfs://home/pgandhi/GoogleNews-vectors-negative300.bin', > >>> binary=True)) > {code} > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org