[ 
https://issues.apache.org/jira/browse/SPARK-23246?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16342220#comment-16342220
 ] 

MBA Learns to Code edited comment on SPARK-23246 at 1/27/18 4:49 PM:
---------------------------------------------------------------------

[~srowen] thanks for the follow-up. I've updated the description to include 
more details about the error messages.

 

I get your point that data about jobs completed also build up naturally as the 
session goes on. But I feel that it is unlikely as big of a culprit as 
table/RDD metadata: as I increase the complexity of the temporary DataFrames 
for each iteration (illustrated by a bigger --n-partitions in the example 
script above), the OOM occurs faster. For example, if the temporary DataFrames 
are reasonably complex (--n-partitions = 1,000, say), OOM would occur after 
about 150 iterations in the above setting if spark.driver.memory = 512m. In 
more complex real iterative programs that I run for ML/DL-related workloads, 
even when spark.driver.memory is big (e.g., 6g), we also have OOM trouble after 
a few hundred or a few thousand iterations.


was (Author: mbalearnstocode):
[~srowen] thanks for the follow-up. I've updated the description to include 
more details about the error messages.

 

I get your point that data about jobs completed also build up naturally as the 
session goes on. But I feel that it is unlikely as big of a culprit as 
table/RDD metadata: as I increase the complexity of the temporary DataFrames 
for each iteration (illustrated by a bigger --n-partitions in the example 
script above), the OOM occurs faster. For example, if the temporary DataFrames 
are reasonably complex (--n-partitions = 1,000, say), OOM would occur after 
about 150 iterations in the above setting if spark.driver.memory = 512m. In 
more complex real iterative programs that I run for ML/DL-related workloads, 
even when spark.driver.memory is big (e.g., 6g), we have OOM trouble after a 
few thousand iterations.

> (Py)Spark OOM because of iteratively accumulated metadata that cannot be 
> cleared
> --------------------------------------------------------------------------------
>
>                 Key: SPARK-23246
>                 URL: https://issues.apache.org/jira/browse/SPARK-23246
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark, Spark Core, SQL
>    Affects Versions: 2.2.1
>            Reporter: MBA Learns to Code
>            Priority: Critical
>
> I am having consistent OOM crashes when trying to use PySpark for iterative 
> algorithms in which I create new DataFrames per iteration (e.g. by sampling 
> from a "mother" DataFrame), do something with such DataFrames, and never need 
> such DataFrames ever in future iterations.
> The below script simulates such OOM failures. Even when one tries explicitly 
> .unpersist() the temporary DataFrames (by using the --unpersist flag below) 
> and/or deleting and garbage-collecting the Python objects (by using the 
> --py-gc flag below), the Java objects seem to stay on and accumulate until 
> they exceed the JVM/driver memory.
> The more complex the temporary DataFrames in each iteration (illustrated by 
> the --n-partitions flag below), the faster OOM occurs.
> The typical error messages include:
>  - "java.lang.OutOfMemoryError : GC overhead limit exceeded"
>  - "Java heap space"
>  - "ERROR TransportRequestHandler: Error sending result 
> RpcResponse{requestId=6053742323219781
>  161, body=NioManagedBuffer{buf=java.nio.HeapByteBuffer[pos=0 lim=47 
> cap=64]}} to /<IP ADDR>; closing connection"
> Please suggest how I may overcome this so that we can have long-running 
> iterative programs using Spark that uses resources only up to a bounded, 
> controllable limit.
>  
> {code:java}
> from __future__ import print_function
> import argparse
> import gc
> import pandas
> import pyspark
> arg_parser = argparse.ArgumentParser()
> arg_parser.add_argument('--unpersist', action='store_true')
> arg_parser.add_argument('--py-gc', action='store_true')
> arg_parser.add_argument('--n-partitions', type=int, default=1000)
> args = arg_parser.parse_args()
> # create SparkSession (*** set spark.driver.memory to 512m in 
> spark-defaults.conf ***)
> spark = pyspark.sql.SparkSession.builder \
>     .config('spark.executor.instances', '2') \
>     .config('spark.executor.cores', '2') \
>     .config('spark.executor.memory', '512m') \
>     .enableHiveSupport() \
>     .getOrCreate()
> # create Parquet file for subsequent repeated loading
> df = spark.createDataFrame(
>     pandas.DataFrame(
>         dict(
>             row=range(args.n_partitions),
>             x=args.n_partitions * [0]
>         )
>     )
> )
> parquet_path = '/tmp/TestOOM-{}Partitions.parquet'.format(args.n_partitions)
> df.write.parquet(
>     path=parquet_path,
>     partitionBy='row',
>     mode='overwrite'
> )
> i = 0
> # the below loop simulates an iterative algorithm that creates new DataFrames 
> in each iteration (e.g. sampling from a "mother" DataFrame), do something, 
> and never need those DataFrames again in future iterations
> # we are having a problem cleaning up the built-up metadata
> # hence the program will crash after while because of OOM
> while True:
>     _df = spark.read.parquet(parquet_path)
>     if args.unpersist:
>         _df.unpersist()
>     if args.py_gc:
>         del _df
>         gc.collect()
>     i += 1; print('COMPLETED READ ITERATION #{}\n'.format(i))
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to