[ https://issues.apache.org/jira/browse/SPARK-23246?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16342220#comment-16342220 ]
MBA Learns to Code edited comment on SPARK-23246 at 1/27/18 4:49 PM: --------------------------------------------------------------------- [~srowen] thanks for the follow-up. I've updated the description to include more details about the error messages. I get your point that data about jobs completed also build up naturally as the session goes on. But I feel that it is unlikely as big of a culprit as table/RDD metadata: as I increase the complexity of the temporary DataFrames for each iteration (illustrated by a bigger --n-partitions in the example script above), the OOM occurs faster. For example, if the temporary DataFrames are reasonably complex (--n-partitions = 1,000, say), OOM would occur after about 150 iterations in the above setting if spark.driver.memory = 512m. In more complex real iterative programs that I run for ML/DL-related workloads, even when spark.driver.memory is big (e.g., 6g), we also have OOM trouble after a few hundred or a few thousand iterations. was (Author: mbalearnstocode): [~srowen] thanks for the follow-up. I've updated the description to include more details about the error messages. I get your point that data about jobs completed also build up naturally as the session goes on. But I feel that it is unlikely as big of a culprit as table/RDD metadata: as I increase the complexity of the temporary DataFrames for each iteration (illustrated by a bigger --n-partitions in the example script above), the OOM occurs faster. For example, if the temporary DataFrames are reasonably complex (--n-partitions = 1,000, say), OOM would occur after about 150 iterations in the above setting if spark.driver.memory = 512m. In more complex real iterative programs that I run for ML/DL-related workloads, even when spark.driver.memory is big (e.g., 6g), we have OOM trouble after a few thousand iterations. > (Py)Spark OOM because of iteratively accumulated metadata that cannot be > cleared > -------------------------------------------------------------------------------- > > Key: SPARK-23246 > URL: https://issues.apache.org/jira/browse/SPARK-23246 > Project: Spark > Issue Type: Bug > Components: PySpark, Spark Core, SQL > Affects Versions: 2.2.1 > Reporter: MBA Learns to Code > Priority: Critical > > I am having consistent OOM crashes when trying to use PySpark for iterative > algorithms in which I create new DataFrames per iteration (e.g. by sampling > from a "mother" DataFrame), do something with such DataFrames, and never need > such DataFrames ever in future iterations. > The below script simulates such OOM failures. Even when one tries explicitly > .unpersist() the temporary DataFrames (by using the --unpersist flag below) > and/or deleting and garbage-collecting the Python objects (by using the > --py-gc flag below), the Java objects seem to stay on and accumulate until > they exceed the JVM/driver memory. > The more complex the temporary DataFrames in each iteration (illustrated by > the --n-partitions flag below), the faster OOM occurs. > The typical error messages include: > - "java.lang.OutOfMemoryError : GC overhead limit exceeded" > - "Java heap space" > - "ERROR TransportRequestHandler: Error sending result > RpcResponse{requestId=6053742323219781 > 161, body=NioManagedBuffer{buf=java.nio.HeapByteBuffer[pos=0 lim=47 > cap=64]}} to /<IP ADDR>; closing connection" > Please suggest how I may overcome this so that we can have long-running > iterative programs using Spark that uses resources only up to a bounded, > controllable limit. > > {code:java} > from __future__ import print_function > import argparse > import gc > import pandas > import pyspark > arg_parser = argparse.ArgumentParser() > arg_parser.add_argument('--unpersist', action='store_true') > arg_parser.add_argument('--py-gc', action='store_true') > arg_parser.add_argument('--n-partitions', type=int, default=1000) > args = arg_parser.parse_args() > # create SparkSession (*** set spark.driver.memory to 512m in > spark-defaults.conf ***) > spark = pyspark.sql.SparkSession.builder \ > .config('spark.executor.instances', '2') \ > .config('spark.executor.cores', '2') \ > .config('spark.executor.memory', '512m') \ > .enableHiveSupport() \ > .getOrCreate() > # create Parquet file for subsequent repeated loading > df = spark.createDataFrame( > pandas.DataFrame( > dict( > row=range(args.n_partitions), > x=args.n_partitions * [0] > ) > ) > ) > parquet_path = '/tmp/TestOOM-{}Partitions.parquet'.format(args.n_partitions) > df.write.parquet( > path=parquet_path, > partitionBy='row', > mode='overwrite' > ) > i = 0 > # the below loop simulates an iterative algorithm that creates new DataFrames > in each iteration (e.g. sampling from a "mother" DataFrame), do something, > and never need those DataFrames again in future iterations > # we are having a problem cleaning up the built-up metadata > # hence the program will crash after while because of OOM > while True: > _df = spark.read.parquet(parquet_path) > if args.unpersist: > _df.unpersist() > if args.py_gc: > del _df > gc.collect() > i += 1; print('COMPLETED READ ITERATION #{}\n'.format(i)) > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org