Seems like you are having memory issues. Examine your settings. 1. It appears that your driver memory setting is too high. It should be a fraction of total memy provided by YARN 2. Use the Spark UI to monitor the job's memory consumption. Check the Storage tab to see how memory is being utilized across caches, data, and shuffle. 3. Check the Executors tab to identify tasks or executors that are experiencing memory issues. Look for tasks with high input sizes or shuffle spills. 4. In YARN mode, consider setting spark.executor.memoryOverhead property to handle executor overhead. This is important for tasks that require additional memory beyond the executor memory setting. Example 5. --conf spark.executor.memoryOverhead=1000
HTH Mich Talebzadeh, Dad | Technologist | Solutions Architect | Engineer London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". On Fri, 23 Feb 2024 at 02:42, Shawn Ligocki <sligo...@gmail.com> wrote: > Hi I'm new to Spark and I'm running into a lot of OOM issues while trying > to scale up my first Spark application. I am running into these issues with > only 1% of the final expected data size. Can anyone help me understand how > to properly configure Spark to use limited memory or how to debug which > part of my application is causing so much memory trouble? > > My logs end up with tons of messages like: > > 24/02/22 10:51:01 WARN TaskMemoryManager: Failed to allocate a page >> (134217728 bytes), try again. >> 24/02/22 10:51:01 WARN RowBasedKeyValueBatch: Calling spill() on >> RowBasedKeyValueBatch. Will not spill but return 0. >> 24/02/22 10:52:28 WARN Executor: Issue communicating with driver in >> heartbeater >> org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [10000 >> milliseconds]. This timeout is controlled by >> spark.executor.heartbeatInterval >> ... >> 24/02/22 10:58:17 WARN NettyRpcEnv: Ignored message: >> HeartbeatResponse(false) >> 24/02/22 10:58:17 WARN HeartbeatReceiver: Removing executor driver with >> no recent heartbeats: 207889 ms exceeds timeout 120000 ms >> 24/02/22 10:58:17 ERROR Executor: Exception in task 175.0 in stage 2.0 >> (TID 676) >> java.lang.OutOfMemoryError: Java heap space >> ... > > > Background: The goal of this application is to load a large number of > parquet files, group by a couple fields and compute some summarization > metrics for each group and write the result out. In Python basically: > > from pyspark.sql import SparkSession >> import pyspark.sql.functions as func > > >> spark = SparkSession.builder.getOrCreate() >> df = spark.read.parquet(*pred_paths) >> df = df.groupBy("point_id", "species_code").agg( >> func.count("pred_occ").alias("ensemble_support")) >> df.write.parquet(output_path) > > > And I am launching it with: > > spark-submit \ >> --name ensemble \ >> --driver-memory 64g --executor-memory 64g \ >> stem/ensemble_spark.py > > > I noticed that increasing --driver-memory and --executor-memory did help > me scale up somewhat, but I cannot increase those forever. > > Some details: > > - All my tests are currently on a single cluster node (with 128GB RAM > & 64 CPU cores) or locally on my laptop (32GB RAM & 12 CPU cores). > Eventually, I expect to run this in parallel on the cluster. > - This is running on Spark 3.0.1 (in the cluster), I'm seeing the same > issues with 3.5 on my laptop. > - The input data is tons of parquet files stored on NFS. For the final > application it will be about 50k parquet files ranging in size up to 15GB > each. Total size of 100TB, 4 trillion rows, 5 columns. I am currently > testing with ~1% this size: 500 files, 1TB total, 40B rows total. > - There should only be a max of 100 rows per group. So I expect an > output size somewhere in the range 1-5TB, 40-200B rows. For the test: 50GB, > 2B rows. These output files are also written to NFS. > - The rows for the same groups are not near each other. Ex: no single > parquet file will have any two rows for the same group. > > Here are some questions I have: > > 1. Does Spark know how much memory is available? Do I need to tell it > somehow? Is there other configuration that I should set up for a run like > this? I know that 1TB input data is too much to fit in memory, but I > assumed that Spark would work on it in small enough batches to fit. Do I > need to configure those batches somehow? > 2. How can I debug what is causing it to OOM? > 3. Does this have something to do with the fact that I'm loading the > data from Parquet files? Or that I'm loading so many different files? Or > that I'm loading them from NFS? > 4. Do I need to configure the reduce step (group and aggregation) > differently because of the type of data I have (large numbers of groups, > stratified groups)? > > Thank you! > -Shawn Ligocki > >