[ 
https://issues.apache.org/jira/browse/SPARK-20925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16030197#comment-16030197
 ] 

Jeffrey Quinn commented on SPARK-20925:
---------------------------------------

Apologies, will move to the mailing list next time I have a general question 
like that.

I agree key skew is often an issue, but for the data we were testing with the 
cardinality of the partition column is 1, which helps rule some things out.

I wanted to post again because after taking another crack at looking through 
the source I think I may have found a root cause:

The ExecuteWriteTask implementation for a partitioned table 
(org.apache.spark.sql.execution.datasources.FileFormatWriter.DynamicPartitionWriteTask)
 sorts the rows of the table by the partition keys before writing. This makes 
sense as it minimizes the number of OutputWriters that need to be created.

In the course of doing this, the ExecuteWriteTask uses 
org.apache.spark.sql.execution.UnsafeKVExternalSorter to sort the rows to be 
written. It then gets an iterator over the sorted rows via 
org.apache.spark.sql.execution.UnsafeKVExternalSorter#sortedIterator.

The scaladoc of that method advises that it is the callers responsibility to 
call org.apache.spark.sql.execution.UnsafeKVExternalSorter#cleanupResources 
(see 
https://github.com/apache/spark/blob/v2.1.0/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java#L176).

However in ExecuteWriteTask, we appear to never call cleanupResources() when we 
are done with the iterator (see 
https://github.com/apache/spark/blob/v2.1.0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala#L379).

This seems like it could create a memory leak, which would explain the behavior 
that we have observed.

Luckily, it seems like this possible memory leak was fixed totally 
coincidentally by this revision: 
https://github.com/apache/spark/commit/776b8f17cfc687a57c005a421a81e591c8d44a3f

Which changes this behavior for stated performance reasons. So the best 
solution to this issue may be to upgrade to v2.1.1.


> Out of Memory Issues With org.apache.spark.sql.DataFrameWriter#partitionBy
> --------------------------------------------------------------------------
>
>                 Key: SPARK-20925
>                 URL: https://issues.apache.org/jira/browse/SPARK-20925
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.1.0
>            Reporter: Jeffrey Quinn
>
> Observed under the following conditions:
> Spark Version: Spark 2.1.0
> Hadoop Version: Amazon 2.7.3 (emr-5.5.0)
> spark.submit.deployMode = client
> spark.master = yarn
> spark.driver.memory = 10g
> spark.shuffle.service.enabled = true
> spark.dynamicAllocation.enabled = true
> The job we are running is very simple: Our workflow reads data from a JSON 
> format stored on S3, and write out partitioned parquet files to HDFS.
> As a one-liner, the whole workflow looks like this:
> ```
> sparkSession.sqlContext
>         .read
>         .schema(inputSchema)
>         .json(expandedInputPath)
>         .select(columnMap:_*)
>         .write.partitionBy("partition_by_column")
>         .parquet(outputPath)
> ```
> Unfortunately, for larger inputs, this job consistently fails with containers 
> running out of memory. We observed containers of up to 20GB OOMing, which is 
> surprising because the input data itself is only 15 GB compressed and maybe 
> 100GB uncompressed.
> The error message we get indicates yarn is killing the containers. The 
> executors are running out of memory and not the driver.
> ```Caused by: org.apache.spark.SparkException: Job aborted due to stage 
> failure: Task 184 in stage 74.0 failed 4 times, most recent failure: Lost 
> task 184.3 in stage 74.0 (TID 19110, ip-10-242-15-251.ec2.internal, executor 
> 14): ExecutorLostFailure (executor 14 exited caused by one of the running 
> tasks) Reason: Container killed by YARN for exceeding memory limits. 21.5 GB 
> of 20.9 GB physical memory used. Consider boosting 
> spark.yarn.executor.memoryOverhead.```
> We tried a full parameter sweep, including using dynamic allocation and 
> setting executor memory as high as 20GB. The result was the same each time, 
> with the job failing due to lost executors due to YARN killing containers.
> We were able to bisect that `partitionBy` is the problem by progressively 
> removing/commenting out parts of our workflow. Finally when we get to the 
> above state, if we remove `partitionBy` the job succeeds with no OOM.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to