Hi I'm new to Spark and I'm running into a lot of OOM issues while trying
to scale up my first Spark application. I am running into these issues with
only 1% of the final expected data size. Can anyone help me understand how
to properly configure Spark to use limited memory or how to debug which
part of my application is causing so much memory trouble?

My logs end up with tons of messages like:

24/02/22 10:51:01 WARN TaskMemoryManager: Failed to allocate a page
> (134217728 bytes), try again.
> 24/02/22 10:51:01 WARN RowBasedKeyValueBatch: Calling spill() on
> RowBasedKeyValueBatch. Will not spill but return 0.
> 24/02/22 10:52:28 WARN Executor: Issue communicating with driver in
> heartbeater
> org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [10000
> milliseconds]. This timeout is controlled by
> spark.executor.heartbeatInterval
> ...
> 24/02/22 10:58:17 WARN NettyRpcEnv: Ignored message:
> HeartbeatResponse(false)
> 24/02/22 10:58:17 WARN HeartbeatReceiver: Removing executor driver with no
> recent heartbeats: 207889 ms exceeds timeout 120000 ms
> 24/02/22 10:58:17 ERROR Executor: Exception in task 175.0 in stage 2.0
> (TID 676)
> java.lang.OutOfMemoryError: Java heap space
> ...


Background: The goal of this application is to load a large number of
parquet files, group by a couple fields and compute some summarization
metrics for each group and write the result out. In Python basically:

from pyspark.sql import SparkSession
> import pyspark.sql.functions as func


> spark = SparkSession.builder.getOrCreate()
> df = spark.read.parquet(*pred_paths)
> df = df.groupBy("point_id", "species_code").agg(
>   func.count("pred_occ").alias("ensemble_support"))
> df.write.parquet(output_path)


And I am launching it with:

spark-submit \
>   --name ensemble \
>   --driver-memory 64g --executor-memory 64g \
>   stem/ensemble_spark.py


I noticed that increasing --driver-memory and --executor-memory did help me
scale up somewhat, but I cannot increase those forever.

Some details:

   - All my tests are currently on a single cluster node (with 128GB RAM &
   64 CPU cores) or locally on my laptop (32GB RAM & 12 CPU cores).
   Eventually, I expect to run this in parallel on the cluster.
   - This is running on Spark 3.0.1 (in the cluster), I'm seeing the same
   issues with 3.5 on my laptop.
   - The input data is tons of parquet files stored on NFS. For the final
   application it will be about 50k parquet files ranging in size up to 15GB
   each. Total size of 100TB, 4 trillion rows, 5 columns. I am currently
   testing with ~1% this size: 500 files, 1TB total, 40B rows total.
   - There should only be a max of 100 rows per group. So I expect an
   output size somewhere in the range 1-5TB, 40-200B rows. For the test: 50GB,
   2B rows. These output files are also written to NFS.
   - The rows for the same groups are not near each other. Ex: no single
   parquet file will have any two rows for the same group.

Here are some questions I have:

   1. Does Spark know how much memory is available? Do I need to tell it
   somehow? Is there other configuration that I should set up for a run like
   this? I know that 1TB input data is too much to fit in memory, but I
   assumed that Spark would work on it in small enough batches to fit. Do I
   need to configure those batches somehow?
   2. How can I debug what is causing it to OOM?
   3. Does this have something to do with the fact that I'm loading the
   data from Parquet files? Or that I'm loading so many different files? Or
   that I'm loading them from NFS?
   4. Do I need to configure the reduce step (group and aggregation)
   differently because of the type of data I have (large numbers of groups,
   stratified groups)?

Thank you!
-Shawn Ligocki

Reply via email to