[ https://issues.apache.org/jira/browse/SPARK-34510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17307879#comment-17307879 ]
Sean R. Owen commented on SPARK-34510: -------------------------------------- Firstly, does this happen on Apache Spark or just EMR? Are you sure it's not just taking a long time to read data from S3? > .foreachPartition command hangs when ran inside Python package but works when > ran from Python file outside the package on EMR > ----------------------------------------------------------------------------------------------------------------------------- > > Key: SPARK-34510 > URL: https://issues.apache.org/jira/browse/SPARK-34510 > Project: Spark > Issue Type: Bug > Components: EC2, PySpark > Affects Versions: 3.0.0 > Reporter: Yuriy > Priority: Minor > Attachments: Code.zip > > > I'm running on EMR Pyspark 3.0.0. with project structure below, process.py is > what controls the flow of the application and calls code inside the > _file_processor_ package. The command hangs when the .foreachPartition code > that is located inside _s3_repo.py_ is called by _process.py_. When the same > .foreachPartition code is moved from _s3_repo.py_ and placed inside the > _process.py_ it runs just fine. > {code:java} > process.py > file_processor > config > spark.py > repository > s3_repo.py > structure > table_creator.py > {code} > *process.py* > {code:java} > from file_processor.structure import table_creator > from file_processor.repository import s3_repo > def process(): > table_creator.create_table() > s3_repo.save_to_s3() > if __name__ == '__main__': > process() > {code} > *spark.py* > {code:java} > from pyspark.sql import SparkSession > spark_session = SparkSession.builder.appName("Test").getOrCreate() > {code} > *s3_repo.py* > {code:java} > from file_processor.config.spark import spark_session > def save_to_s3(): > spark_session.sql('SELECT * FROM > rawFileData').toJSON().foreachPartition(_save_to_s3) > def _save_to_s3(iterator): > for record in iterator: > print(record) > {code} > *table_creator.py* > {code:java} > from file_processor.config.spark import spark_session > from pyspark.sql import Row > def create_table(): > file_contents = [ > {'line_num': 1, 'contents': 'line 1'}, > {'line_num': 2, 'contents': 'line 2'}, > {'line_num': 3, 'contents': 'line 3'} > ] > spark_session.createDataFrame(Row(**row) for row in > file_contents).cache().createOrReplaceTempView("rawFileData") > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org