Haitham Eltaweel created SPARK-44679:
----------------------------------------

             Summary: java.lang.OutOfMemoryError: Requested array size exceeds 
VM limit
                 Key: SPARK-44679
                 URL: https://issues.apache.org/jira/browse/SPARK-44679
             Project: Spark
          Issue Type: Bug
          Components: EC2, PySpark
    Affects Versions: 3.2.1
         Environment: We use Amazon EMR to run Pyspark jobs.
Amazon EMR version : emr-6.7.0
Installed applications : 
Tez 0.9.2, Spark 3.2.1, Hive 3.1.3, Sqoop 1.4.7, Hadoop 3.2.1, Zookeeper 3.5.7, 
HCatalog 3.1.3, Livy 0.7.1
            Reporter: Haitham Eltaweel


We get the following error from our Pyspark application in Production env:

_java.lang.OutOfMemoryError: Requested array size exceeds VM limit_

I simplified the code we used and shared it below so you can easily investigate 
the issue.

We use Pyspark to read 900 MB text file which has one record. We use foreach 
function to iterate over the Datafreme and apply some high order function. The 
error occurs once foreach action is triggered. I think the issue is related to 
the integer data type of the bytes array used to hold the serialized dataframe. 
Since the dataframe record was too big, it seems the serialized record exceeded 
the max integer value, hence the error occurred. 

Note that the same error happens when using foreachBatch function with 
writeStream. 

Our prod data has many records larger than 100 MB.  Appreciate your help to 
provide a fix or a solution to that issue.

 

*Find below the code snippet:*
from pyspark.sql import SparkSession,functions as f
 
def check_file_name(row):
    print("check_file_name called")
 
def main():
    spark=SparkSession.builder.enableHiveSupport().getOrCreate()
inputPath = "s3://bucket-name/common/source/"
    inputDF = spark.read.text(inputPath, wholetext=True)
    inputDF = inputDF.select(f.date_format(f.current_timestamp(), 
'yyyyMMddHH').astype('string').alias('insert_hr'),
                        f.col("value").alias("raw_data"),
                        f.input_file_name().alias("input_file_name"))
    inputDF.foreach(check_file_name)
 
if __name__ == "__main__":
    main()
*Find below spark-submit command used:*

spark-submit --master yarn --conf 
spark.serializer=org.apache.spark.serializer.KryoSerializer  --num-executors 15 
--executor-cores 4 --executor-memory 20g --driver-memory 20g --name haitham_job 
--deploy-mode cluster big_file_process.py



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to