[ 
https://issues.apache.org/jira/browse/SPARK-34510?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yuriy updated SPARK-34510:
--------------------------
    Description: 
I'm running on EMR Pyspark 3.0.0. with a project structure below, process.py is 
what controls the flow of the application and calls code inside the 
file_processor package. The command hangs when the .foreachPartition code that 
is located inside _s3_repo.py_ is called by _process.py_. When the same 
.foreachPartition code is moved from _s3_repo.py_ and placed inside the 
_process.py_ it runs just fine.
{code:java}
process.py
file_processor
  config        
    spark.py
  repository        
    s3_repo.py
  structure        
    table_creator.py

{code}
*process.py*
{code:java}
from file_processor.structure import table_creator
from file_processor.repository import s3_repo

def process():
    table_creator.create_table()
    s3_repo.save_to_s3()

if __name__ == '__main__':
    process()
{code}
*spark.py*
{code:java}
from pyspark.sql import SparkSession
spark_session = SparkSession.builder.appName("Test").getOrCreate()
{code}
*s3_repo.py* 
{code:java}
from file_processor.config.spark import spark_session

def save_to_s3():
    spark_session.sql('SELECT * FROM 
rawFileData').toJSON().foreachPartition(_save_to_s3)

def _save_to_s3(iterator):   
    for record in iterator:
        print(record)
{code}
 *table_creator.py*
{code:java}
from file_processor.config.spark import spark_session
from pyspark.sql import Row

def create_table():
    file_contents = [
        {'line_num': 1, 'contents': 'line 1'},
        {'line_num': 2, 'contents': 'line 2'},
        {'line_num': 3, 'contents': 'line 3'}        
    ]
    spark_session.createDataFrame(Row(**row) for row in 
file_contents).cache().createOrReplaceTempView("rawFileData")
{code}

  was:
I'm running on EMR Pyspark 3.0.0. with a project structure below, process.py is 
what controls the flow of the application and calls code inside the 
file_processor package. The command hangs when the .foreachPartition code that 
is located inside _s3_repo.py_ is called by _process.py_. When the same 
.foreachPartition code is moved from _s3_repo.py_ and placed inside the 
_process.py_ it runs just fine.
{code:java}
process.py
file_processor
  config        
    spark.py
  repository        
    s3_repo.py
  structure        
    table_creator.py

{code}
*process.py*
{code:java}
from file_processor.structure import table_creator
from file_processor.repository import s3_repo

def process():
    table_creator.create_table()
    s3_repo.save_to_s3()

if __name__ == '__main__':
    process()
{code}
*spark.py*
{code:java}
from pyspark.sql import SparkSession
spark_session = SparkSession.builder.appName("Test").getOrCreate()
{code}
*s3_repo.py* 
{code:java}
from file_processor.config.spark import spark_session

def save_to_s3():
    spark_session.sql('SELECT * FROM 
rawFileData').toJSON().foreachPartition(_save_to_s3)

def _save_to_s3(iterator):   
    for record in iterator:
        print(record)
{code}
 

*table_creator.py*
{code:java}
from file_processor.config.spark import spark_session
from pyspark.sql import Row

def create_table():
    file_contents = [
        {'line_num': 1, 'contents': 'line 1'},
        {'line_num': 2, 'contents': 'line 2'},
        {'line_num': 3, 'contents': 'line 3'}        
    ]
    spark_session.createDataFrame(Row(**row) for row in 
file_contents).cache().createOrReplaceTempView("rawFileData")
{code}


> .foreachPartition command hangs when ran inside Python package but works when 
> ran from Python file outside the package on EMR
> -----------------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-34510
>                 URL: https://issues.apache.org/jira/browse/SPARK-34510
>             Project: Spark
>          Issue Type: Bug
>          Components: EC2, PySpark
>    Affects Versions: 3.0.0
>            Reporter: Yuriy
>            Priority: Minor
>         Attachments: Code.zip
>
>
> I'm running on EMR Pyspark 3.0.0. with a project structure below, process.py 
> is what controls the flow of the application and calls code inside the 
> file_processor package. The command hangs when the .foreachPartition code 
> that is located inside _s3_repo.py_ is called by _process.py_. When the same 
> .foreachPartition code is moved from _s3_repo.py_ and placed inside the 
> _process.py_ it runs just fine.
> {code:java}
> process.py
> file_processor
>   config        
>     spark.py
>   repository        
>     s3_repo.py
>   structure        
>     table_creator.py
> {code}
> *process.py*
> {code:java}
> from file_processor.structure import table_creator
> from file_processor.repository import s3_repo
> def process():
>     table_creator.create_table()
>     s3_repo.save_to_s3()
> if __name__ == '__main__':
>     process()
> {code}
> *spark.py*
> {code:java}
> from pyspark.sql import SparkSession
> spark_session = SparkSession.builder.appName("Test").getOrCreate()
> {code}
> *s3_repo.py* 
> {code:java}
> from file_processor.config.spark import spark_session
> def save_to_s3():
>     spark_session.sql('SELECT * FROM 
> rawFileData').toJSON().foreachPartition(_save_to_s3)
> def _save_to_s3(iterator):   
>     for record in iterator:
>         print(record)
> {code}
>  *table_creator.py*
> {code:java}
> from file_processor.config.spark import spark_session
> from pyspark.sql import Row
> def create_table():
>     file_contents = [
>         {'line_num': 1, 'contents': 'line 1'},
>         {'line_num': 2, 'contents': 'line 2'},
>         {'line_num': 3, 'contents': 'line 3'}        
>     ]
>     spark_session.createDataFrame(Row(**row) for row in 
> file_contents).cache().createOrReplaceTempView("rawFileData")
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to