[ https://issues.apache.org/jira/browse/SPARK-26658?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Apache Spark reassigned SPARK-26658: ------------------------------------ Assignee: Apache Spark > Pyspark job is unable to serialize large objects > ------------------------------------------------- > > Key: SPARK-26658 > URL: https://issues.apache.org/jira/browse/SPARK-26658 > Project: Spark > Issue Type: Bug > Components: PySpark > Affects Versions: 3.0.0 > Reporter: Parth Gandhi > Assignee: Apache Spark > Priority: Minor > > When a pyspark job using python 3 tries to serialize large objects, it throws > a pickle error in case of trying to serialize global variable object and > overflow error in case of broadcast. > global object: > > {code:java} > Traceback (most recent call last): > File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", > line 235, in dump > return Pickler.dump(self, obj) > File "/home/var/python36/lib/python3.6/pickle.py", line 409, in dump > self.save(obj) > File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save > f(self, obj) # Call unbound method with explicit self > File "/home/var/python36/lib/python3.6/pickle.py", line 751, in save_tuple > save(element) > File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save > f(self, obj) # Call unbound method with explicit self > File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", > line 378, in save_function > self.save_function_tuple(obj) > File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", > line 529, in save_function_tuple > save(closure_values) > File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save > f(self, obj) # Call unbound method with explicit self > File "/home/var/python36/lib/python3.6/pickle.py", line 781, in save_list > self._batch_appends(obj) > File "/home/var/python36/lib/python3.6/pickle.py", line 805, in > _batch_appends > save(x) > File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save > f(self, obj) # Call unbound method with explicit self > File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", > line 378, in save_function > self.save_function_tuple(obj) > File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", > line 529, in save_function_tuple > save(closure_values) > File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save > f(self, obj) # Call unbound method with explicit self > File "/home/var/python36/lib/python3.6/pickle.py", line 781, in save_list > self._batch_appends(obj) > File "/home/var/python36/lib/python3.6/pickle.py", line 805, in > _batch_appends > save(x) > File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save > f(self, obj) # Call unbound method with explicit self > File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", > line 378, in save_function > self.save_function_tuple(obj) > File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", > line 529, in save_function_tuple > save(closure_values) > File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save > f(self, obj) # Call unbound method with explicit self > File "/home/var/python36/lib/python3.6/pickle.py", line 781, in save_list > self._batch_appends(obj) > File "/home/var/python36/lib/python3.6/pickle.py", line 805, in > _batch_appends > save(x) > File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save > f(self, obj) # Call unbound method with explicit self > File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", > line 378, in save_function > self.save_function_tuple(obj) > File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", > line 529, in save_function_tuple > save(closure_values) > File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save > f(self, obj) # Call unbound method with explicit self > File "/home/var/python36/lib/python3.6/pickle.py", line 781, in save_list > self._batch_appends(obj) > File "/home/var/python36/lib/python3.6/pickle.py", line 808, in > _batch_appends > save(tmp[0]) > File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save > f(self, obj) # Call unbound method with explicit self > File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", > line 372, in save_function > self.save_function_tuple(obj) > File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", > line 525, in save_function_tuple > save(f_globals) > File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save > f(self, obj) # Call unbound method with explicit self > File "/home/var/python36/lib/python3.6/pickle.py", line 821, in save_dict > self._batch_setitems(obj.items()) > File "/home/var/python36/lib/python3.6/pickle.py", line 847, in > _batch_setitems > save(v) > File "/home/var/python36/lib/python3.6/pickle.py", line 521, in save > self.save_reduce(obj=obj, *rv) > File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", > line 804, in save_reduce > save(state) > File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save > f(self, obj) # Call unbound method with explicit self > File "/home/var/python36/lib/python3.6/pickle.py", line 821, in save_dict > self._batch_setitems(obj.items()) > File "/home/var/python36/lib/python3.6/pickle.py", line 847, in > _batch_setitems > save(v) > File "/home/var/python36/lib/python3.6/pickle.py", line 521, in save > self.save_reduce(obj=obj, *rv) > File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", > line 804, in save_reduce > save(state) > File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save > f(self, obj) # Call unbound method with explicit self > File "/home/var/python36/lib/python3.6/pickle.py", line 751, in save_tuple > save(element) > File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save > f(self, obj) # Call unbound method with explicit self > File "/home/var/python36/lib/python3.6/pickle.py", line 693, in save_bytes > (str(obj, 'latin1'), 'latin1'), obj=obj) > File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", > line 786, in save_reduce > save(args) > File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save > f(self, obj) # Call unbound method with explicit self > File "/home/var/python36/lib/python3.6/pickle.py", line 736, in save_tuple > save(element) > File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save > f(self, obj) # Call unbound method with explicit self > File "/home/var/python36/lib/python3.6/pickle.py", line 714, in save_str > self.write(BINUNICODE + pack("<I", n) + encoded) > struct.error: 'I' format requires 0 <= number <= 4294967295 > {code} > > > > broadcast: > {code:java} > Traceback (most recent call last): > > File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/broadcast.py", > line 113, in dump > pickle.dump(value, f, 2) > OverflowError: cannot serialize a string larger than 4GiB > Traceback (most recent call last): > File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/broadcast.py", > line 113, in dump > pickle.dump(value, f, 2) > OverflowError: cannot serialize a string larger than 4GiB > {code} > > > Steps to Reproduce: > - To reproduce the above issue, I am using the word2vec model trained on the > Google News dataset downloaded from > [https://drive.google.com/file/d/0B7XkCwpI5KDYNlNUTTlSS21pQmM/edit?usp=sharing] > - Use python 3.x with module gensim installed(or ship the module zip file > using --py-files). > Launch pyspark with the following command: > {code:java} > bin/pyspark --master yarn --py-files additionalPythonModules.zip --conf > spark.driver.memory=16g --conf spark.executor.memory=16g --conf > spark.driver.memoryOverhead=16g --conf spark.executor.memoryOverhd=16g --conf > spark.executor.pyspark.memory=16g{code} > For the sake of reproducing the issue, I have simply pasted certain parts of > the code here: > > {code:java} > Setting default log level to "WARN". > To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use > setLogLevel(newLevel). > 19/01/17 21:16:46 WARN DomainSocketFactory: The short-circuit local reads > feature cannot be used because libhadoop cannot be loaded. > 19/01/17 21:16:48 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive > is set, falling back to uploading libraries under SPARK_HOME. > Welcome to > ____ __ > / __/__ ___ _____/ /__ > _\ \/ _ \/ _ `/ __/ '_/ > /__ / .__/\_,_/_/ /_/\_\ version 2.3.3-SNAPSHOT > /_/ > > Using Python version 3.6.3 (default, Jun 19 2018 22:39:11) > SparkSession available as 'spark'. > >>> import gensim > /home/y/var/python36/lib/python3.6/site-packages/requests/__init__.py:91: > RequestsDependencyWarning: urllib3 (1.24.1) or chardet (3.0.4) doesn't match > a supported version! > RequestsDependencyWarning) > >>> score_threshold = 0.65 > >>> synonym_limit = 3 > >>> model = > >>> gensim.models.KeyedVectors.load_word2vec_format('hdfs://home/pgandhi/GoogleNews-vectors-negative300.bin', > >>> binary=True) > >>> def isPhrase(word): > ... if word.find('_') != -1 : > ... return 1 > ... return 0 > ... > >>> def process_word(line): > ... word = "test" > ... positiveWords = [] > ... positiveWords.append(word) > ... try : > ... results = model.most_similar(positive=positiveWords) > ... synonym_vec = [] > ... for i in range(len(results)) : > ... result = results[i] > ... if (result[1] > score_threshold ) : > ... synonym = result[0] > ... synonym = synonym.lower() > ... if (isPhrase(synonym)==0) and (word != synonym) : > ... synonym_vec.append(synonym) > ... if len(synonym_vec) > synonym_limit : > ... break > ... if len(synonym_vec) > 0 : > ... #print(word +"\t"+ ",".join(synonym_vec)) > ... return (word, ",".join(synonym_vec)) > ... except KeyError : > ... sys.stderr.write("key error: " + word + "\n") > ... > >>> if __name__ == "__main__": > ... rdd = sc.parallelize(["test1", "test2", "test3"]) > ... rdd2 = rdd.map(process_word) > ... rdd2.count() > ... > {code} > > * For reproducing the issue with broadcast, simply run the code below in > pyspark shell: > > {code:java} > Setting default log level to "WARN". > To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use > setLogLevel(newLevel). > 19/01/17 19:31:10 WARN DomainSocketFactory: The short-circuit local reads > feature cannot be used because libhadoop cannot be loaded. > 19/01/17 19:31:12 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive > is set, falling back to uploading libraries under SPARK_HOME. > Welcome to > ____ __ > / __/__ ___ _____/ /__ > _\ \/ _ \/ _ `/ __/ '_/ > /__ / .__/\_,_/_/ /_/\_\ version 2.3.3-SNAPSHOT > /_/ > > Using Python version 3.6.3 (default, Jun 19 2018 22:39:11) > SparkSession available as 'spark'. > >>> import gensim > /home/y/var/python36/lib/python3.6/site-packages/requests/__init__.py:91: > RequestsDependencyWarning: urllib3 (1.24.1) or chardet (3.0.4) doesn't match > a supported version! > RequestsDependencyWarning) > >>> model = > >>> sc.broadcast(gensim.models.KeyedVectors.load_word2vec_format('hdfs://user/pgandhi/GoogleNews-vectors-negative300.bin', > >>> binary=True)) > {code} > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org