[ 
https://issues.apache.org/jira/browse/SPARK-26658?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-26658:
------------------------------------

    Assignee:     (was: Apache Spark)

> Pyspark job is unable to serialize large objects 
> -------------------------------------------------
>
>                 Key: SPARK-26658
>                 URL: https://issues.apache.org/jira/browse/SPARK-26658
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 3.0.0
>            Reporter: Parth Gandhi
>            Priority: Minor
>
> When a pyspark job using python 3 tries to serialize large objects, it throws 
> a pickle error in case of trying to serialize global variable object and 
> overflow error in case of broadcast.
> global object:
>  
> {code:java}
> Traceback (most recent call last):
>   File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", 
> line 235, in dump
>     return Pickler.dump(self, obj)
>   File "/home/var/python36/lib/python3.6/pickle.py", line 409, in dump
>     self.save(obj)
>   File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save
>     f(self, obj) # Call unbound method with explicit self
>   File "/home/var/python36/lib/python3.6/pickle.py", line 751, in save_tuple
>     save(element)
>   File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save
>     f(self, obj) # Call unbound method with explicit self
>   File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", 
> line 378, in save_function
>     self.save_function_tuple(obj)
>   File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", 
> line 529, in save_function_tuple
>     save(closure_values)
>   File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save
>     f(self, obj) # Call unbound method with explicit self
>   File "/home/var/python36/lib/python3.6/pickle.py", line 781, in save_list
>     self._batch_appends(obj)
>   File "/home/var/python36/lib/python3.6/pickle.py", line 805, in 
> _batch_appends
>     save(x)
>   File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save
>     f(self, obj) # Call unbound method with explicit self
>   File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", 
> line 378, in save_function
>     self.save_function_tuple(obj)
>   File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", 
> line 529, in save_function_tuple
>     save(closure_values)
>   File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save
>     f(self, obj) # Call unbound method with explicit self
>   File "/home/var/python36/lib/python3.6/pickle.py", line 781, in save_list
>     self._batch_appends(obj)
>   File "/home/var/python36/lib/python3.6/pickle.py", line 805, in 
> _batch_appends
>     save(x)
>   File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save
>     f(self, obj) # Call unbound method with explicit self
>   File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", 
> line 378, in save_function
>     self.save_function_tuple(obj)
>   File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", 
> line 529, in save_function_tuple
>     save(closure_values)
>   File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save
>     f(self, obj) # Call unbound method with explicit self
>   File "/home/var/python36/lib/python3.6/pickle.py", line 781, in save_list
>     self._batch_appends(obj)
>   File "/home/var/python36/lib/python3.6/pickle.py", line 805, in 
> _batch_appends
>     save(x)
>   File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save
>     f(self, obj) # Call unbound method with explicit self
>   File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", 
> line 378, in save_function
>     self.save_function_tuple(obj)
>   File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", 
> line 529, in save_function_tuple
>     save(closure_values)
>   File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save
>     f(self, obj) # Call unbound method with explicit self
>   File "/home/var/python36/lib/python3.6/pickle.py", line 781, in save_list
>     self._batch_appends(obj)
>   File "/home/var/python36/lib/python3.6/pickle.py", line 808, in 
> _batch_appends
>     save(tmp[0])
>   File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save
>     f(self, obj) # Call unbound method with explicit self
>   File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", 
> line 372, in save_function
>     self.save_function_tuple(obj)
>   File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", 
> line 525, in save_function_tuple
>     save(f_globals)
>   File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save
>     f(self, obj) # Call unbound method with explicit self
>   File "/home/var/python36/lib/python3.6/pickle.py", line 821, in save_dict
>     self._batch_setitems(obj.items())
>   File "/home/var/python36/lib/python3.6/pickle.py", line 847, in 
> _batch_setitems
>     save(v)
>   File "/home/var/python36/lib/python3.6/pickle.py", line 521, in save
>     self.save_reduce(obj=obj, *rv)
>   File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", 
> line 804, in save_reduce
>     save(state)
>   File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save
>     f(self, obj) # Call unbound method with explicit self
>   File "/home/var/python36/lib/python3.6/pickle.py", line 821, in save_dict
>     self._batch_setitems(obj.items())
>   File "/home/var/python36/lib/python3.6/pickle.py", line 847, in 
> _batch_setitems
>     save(v)
>   File "/home/var/python36/lib/python3.6/pickle.py", line 521, in save
>     self.save_reduce(obj=obj, *rv)
>   File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", 
> line 804, in save_reduce
>     save(state)
>   File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save
>     f(self, obj) # Call unbound method with explicit self
>   File "/home/var/python36/lib/python3.6/pickle.py", line 751, in save_tuple
>     save(element)
>   File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save
>     f(self, obj) # Call unbound method with explicit self
>   File "/home/var/python36/lib/python3.6/pickle.py", line 693, in save_bytes
>     (str(obj, 'latin1'), 'latin1'), obj=obj)
>   File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", 
> line 786, in save_reduce
>     save(args)
>   File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save
>     f(self, obj) # Call unbound method with explicit self
>   File "/home/var/python36/lib/python3.6/pickle.py", line 736, in save_tuple
>     save(element)
>   File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save
>     f(self, obj) # Call unbound method with explicit self
>   File "/home/var/python36/lib/python3.6/pickle.py", line 714, in save_str
>     self.write(BINUNICODE + pack("<I", n) + encoded)
> struct.error: 'I' format requires 0 <= number <= 4294967295
> {code}
>  
>  
>  
> broadcast:
> {code:java}
> Traceback (most recent call last):
>  
> File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/broadcast.py", 
> line 113, in dump
>     pickle.dump(value, f, 2)
> OverflowError: cannot serialize a string larger than 4GiB
> Traceback (most recent call last):
>   File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/broadcast.py", 
> line 113, in dump
>     pickle.dump(value, f, 2)
> OverflowError: cannot serialize a string larger than 4GiB
> {code}
>  
>  
> Steps to Reproduce:
> - To reproduce the above issue, I am using the word2vec model trained on the 
> Google News dataset downloaded from 
> [https://drive.google.com/file/d/0B7XkCwpI5KDYNlNUTTlSS21pQmM/edit?usp=sharing]
>  - Use python 3.x with module gensim installed(or ship the module zip file 
> using --py-files).
> Launch pyspark with the following command:
> {code:java}
> bin/pyspark --master yarn --py-files additionalPythonModules.zip --conf 
> spark.driver.memory=16g --conf spark.executor.memory=16g --conf 
> spark.driver.memoryOverhead=16g --conf spark.executor.memoryOverhd=16g --conf 
> spark.executor.pyspark.memory=16g{code}
> For the sake of reproducing the issue, I have simply pasted certain parts of 
> the code here:
>  
> {code:java}
> Setting default log level to "WARN".
> To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use 
> setLogLevel(newLevel).
> 19/01/17 21:16:46 WARN DomainSocketFactory: The short-circuit local reads 
> feature cannot be used because libhadoop cannot be loaded.
> 19/01/17 21:16:48 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive 
> is set, falling back to uploading libraries under SPARK_HOME.
> Welcome to
>       ____              __
>      / __/__  ___ _____/ /__
>     _\ \/ _ \/ _ `/ __/  '_/
>    /__ / .__/\_,_/_/ /_/\_\   version 2.3.3-SNAPSHOT
>       /_/
>  
> Using Python version 3.6.3 (default, Jun 19 2018 22:39:11)
> SparkSession available as 'spark'.
> >>> import gensim
> /home/y/var/python36/lib/python3.6/site-packages/requests/__init__.py:91: 
> RequestsDependencyWarning: urllib3 (1.24.1) or chardet (3.0.4) doesn't match 
> a supported version!
>   RequestsDependencyWarning)
> >>> score_threshold = 0.65
> >>> synonym_limit = 3
> >>> model = 
> >>> gensim.models.KeyedVectors.load_word2vec_format('hdfs://home/pgandhi/GoogleNews-vectors-negative300.bin',
> >>>  binary=True)
> >>> def isPhrase(word):
> ...     if word.find('_') != -1 :
> ...         return 1
> ...     return 0
> ... 
> >>> def process_word(line):
> ...             word = "test"
> ...             positiveWords = []
> ...             positiveWords.append(word)
> ...             try :
> ...                results = model.most_similar(positive=positiveWords)
> ...                synonym_vec = []
> ...                for i in range(len(results)) :
> ...                   result = results[i]
> ...                   if (result[1] > score_threshold ) :
> ...                       synonym = result[0]
> ...                       synonym = synonym.lower()
> ...                       if (isPhrase(synonym)==0) and (word != synonym) :
> ...                           synonym_vec.append(synonym)
> ...                   if len(synonym_vec) > synonym_limit :
> ...                       break
> ...                if  len(synonym_vec) > 0 :
> ...                    #print(word +"\t"+ ",".join(synonym_vec))
> ...                    return (word, ",".join(synonym_vec))
> ...             except KeyError :
> ...                sys.stderr.write("key error: " + word + "\n")
> ... 
> >>> if __name__ == "__main__":
> ...   rdd = sc.parallelize(["test1", "test2", "test3"])
> ...   rdd2 = rdd.map(process_word)
> ...   rdd2.count()
> ...
> {code}
>  
>  * For reproducing the issue with broadcast, simply run the code below in 
> pyspark shell:
>  
> {code:java}
> Setting default log level to "WARN".
> To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use 
> setLogLevel(newLevel).
> 19/01/17 19:31:10 WARN DomainSocketFactory: The short-circuit local reads 
> feature cannot be used because libhadoop cannot be loaded.
> 19/01/17 19:31:12 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive 
> is set, falling back to uploading libraries under SPARK_HOME.
> Welcome to
>       ____              __
>      / __/__  ___ _____/ /__
>     _\ \/ _ \/ _ `/ __/  '_/
>    /__ / .__/\_,_/_/ /_/\_\   version 2.3.3-SNAPSHOT
>       /_/
>  
> Using Python version 3.6.3 (default, Jun 19 2018 22:39:11)
> SparkSession available as 'spark'.
> >>> import gensim
> /home/y/var/python36/lib/python3.6/site-packages/requests/__init__.py:91: 
> RequestsDependencyWarning: urllib3 (1.24.1) or chardet (3.0.4) doesn't match 
> a supported version!
>   RequestsDependencyWarning)
> >>> model = 
> >>> sc.broadcast(gensim.models.KeyedVectors.load_word2vec_format('hdfs://user/pgandhi/GoogleNews-vectors-negative300.bin',
> >>>  binary=True))
> {code}
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to