[ 
https://issues.apache.org/jira/browse/SPARK-8890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14973529#comment-14973529
 ] 

Jerry Lam edited comment on SPARK-8890 at 10/26/15 12:58 AM:
-------------------------------------------------------------

Hi guys, sorry by injecting comments into the closed jira. I just want to point 
out that I'm using spark 1.5.1, I got OOM in the driver side after all 
partitions are written out (I have over 1 million partitions). The job was 
marked SUCCESS in the output folder but the driver took significant CPU and 
memory. After several hours, the driver dies with OOM. I already configure the 
driver to use 6GB. The jstack of the process is as follows:
{code}
Thread 528: (state = BLOCKED)
 - java.util.Arrays.copyOf(char[], int) @bci=1, line=2367 (Compiled frame)
 - java.lang.AbstractStringBuilder.expandCapacity(int) @bci=43, line=130 
(Compiled frame)
 - java.lang.AbstractStringBuilder.ensureCapacityInternal(int) @bci=12, 
line=114 (Compiled frame)
 - java.lang.AbstractStringBuilder.append(java.lang.String) @bci=19, line=415 
(Compiled frame)
 - java.lang.StringBuilder.append(java.lang.String) @bci=2, line=132 (Compiled 
frame)
 - org.apache.hadoop.fs.Path.toString() @bci=128, line=384 (Compiled frame)
 - 
org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache$$anonfun$listLeafFiles$1.apply(org.apache.hadoop.fs.FileStatus)
 @bci=4, line=447 (Compiled frame)
 - 
org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache$$anonfun$listLeafFiles$1.apply(java.lang.Object)
 @bci=5, line=447 (Compiled frame)
 - scala.collection.TraversableLike$$anonfun$map$1.apply(java.lang.Object) 
@bci=9, line=244 (Compiled frame)
 - scala.collection.TraversableLike$$anonfun$map$1.apply(java.lang.Object) 
@bci=2, line=244 (Compiled frame)
 - 
scala.collection.IndexedSeqOptimized$class.foreach(scala.collection.IndexedSeqOptimized,
 scala.Function1) @bci=22, line=33 (Compiled frame)
 - scala.collection.mutable.ArrayOps$ofRef.foreach(scala.Function1) @bci=2, 
line=108 (Compiled frame)
 - scala.collection.TraversableLike$class.map(scala.collection.TraversableLike, 
scala.Function1, scala.collection.generic.CanBuildFrom) @bci=17, line=244 
(Compiled frame)
 - scala.collection.mutable.ArrayOps$ofRef.map(scala.Function1, 
scala.collection.generic.CanBuildFrom) @bci=3, line=108 (Interpreted frame)
 - 
org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache.listLeafFiles(java.lang.String[])
 @bci=279, line=447 (Interpreted frame)
 - org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache.refresh() 
@bci=8, line=453 (Interpreted frame)
 - 
org.apache.spark.sql.sources.HadoopFsRelation.org$apache$spark$sql$sources$HadoopFsRelation$$fileStatusCache$lzycompute()
 @bci=26, line=465 (Interpreted frame)
 - 
org.apache.spark.sql.sources.HadoopFsRelation.org$apache$spark$sql$sources$HadoopFsRelation$$fileStatusCache()
 @bci=12, line=463 (Interpreted frame)
 - org.apache.spark.sql.sources.HadoopFsRelation.refresh() @bci=1, line=540 
(Interpreted frame)
 - org.apache.spark.sql.execution.datasources.parquet.ParquetRelation.refresh() 
@bci=1, line=204 (Interpreted frame)
 - 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp()
 @bci=392, line=152 (Interpreted frame)
 - 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply()
 @bci=1, line=108 (Interpreted frame)
 - 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply()
 @bci=1, line=108 (Interpreted frame)
 - 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(org.apache.spark.sql.SQLContext,
 org.apache.spark.sql.SQLContext$QueryExecution, scala.Function0) @bci=96, 
line=56 (Interpreted frame)
 - 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(org.apache.spark.sql.SQLContext)
 @bci=718, line=108 (Interpreted frame)
 - org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute() 
@bci=20, line=57 (Interpreted frame)
 - org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult() @bci=15, 
line=57 (Interpreted frame)
 - org.apache.spark.sql.execution.ExecutedCommand.doExecute() @bci=12, line=69 
(Interpreted frame)
 - org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply() @bci=11, 
line=140 (Interpreted frame)
 - org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply() @bci=1, 
line=138 (Interpreted frame)
 - 
org.apache.spark.rdd.RDDOperationScope$.withScope(org.apache.spark.SparkContext,
 java.lang.String, boolean, boolean, scala.Function0) @bci=131, line=147 
(Interpreted frame)
 - org.apache.spark.sql.execution.SparkPlan.execute() @bci=189, line=138 
(Interpreted frame)
 - org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute() @bci=21, 
line=933 (Interpreted frame)
 - org.apache.spark.sql.SQLContext$QueryExecution.toRdd() @bci=13, line=933 
(Interpreted frame)
 - 
org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(org.apache.spark.sql.SQLContext,
 java.lang.String, java.lang.String[], org.apache.spark.sql.SaveMode, 
scala.collection.immutable.Map, org.apache.spark.sql.DataFrame) @bci=293, 
line=197 (Interpreted frame)
 - org.apache.spark.sql.DataFrameWriter.save() @bci=64, line=146 (Interpreted 
frame)
 - org.apache.spark.sql.DataFrameWriter.save(java.lang.String) @bci=24, 
line=137 (Interpreted frame)
 - org.apache.spark.sql.DataFrameWriter.parquet(java.lang.String) @bci=8, 
line=304 (Interpreted frame)
{code}


was (Author: superwai):
Hi guys, sorry by injecting comments into the closed jira. I just want to point 
out that I'm using spark 1.5.1, I got OOM in the driver side after all 
partitions are written out (I have over 1 million partitions). The job was 
marked SUCCESS in the output folder but the driver took significant CPU and 
memory. After several hours, the driver dies with OOM. I already configure the 
driver to use 6GB. The jstack of the process is as follows:
{code}
Thread 528: (state = BLOCKED)
 - java.util.Arrays.copyOf(char[], int) @bci=1, line=2367 (Compiled frame)
 - java.lang.AbstractStringBuilder.expandCapacity(int) @bci=43, line=130 
(Compiled frame)
 - java.lang.AbstractStringBuilder.ensureCapacityInternal(int) @bci=12, 
line=114 (Compiled frame)
 - java.lang.AbstractStringBuilder.append(java.lang.String) @bci=19, line=415 
(Compiled frame)
 - java.lang.StringBuilder.append(java.lang.String) @bci=2, line=132 (Compiled 
frame)
 - org.apache.hadoop.fs.Path.toString() @bci=128, line=384 (Compiled frame)
 - 
org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache$$anonfun$listLeafFiles$1.apply(org.apache.hadoop.fs.FileStatus)
 @bci=4, line=447 (Compiled frame)
 - 
org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache$$anonfun$listLeafFiles$1.apply(java.lang.Object)
 @bci=5, line=447 (Compiled frame)
 - scala.collection.TraversableLike$$anonfun$map$1.apply(java.lang.Object) 
@bci=9, line=244 (Compiled frame)
{code}
 - scala.collection.TraversableLike$$anonfun$map$1.apply(java.lang.Object) 
@bci=2, line=244 (Compiled frame)
 - 
scala.collection.IndexedSeqOptimized$class.foreach(scala.collection.IndexedSeqOptimized,
 scala.Function1) @bci=22, line=33 (Compiled frame)
 - scala.collection.mutable.ArrayOps$ofRef.foreach(scala.Function1) @bci=2, 
line=108 (Compiled frame)
 - scala.collection.TraversableLike$class.map(scala.collection.TraversableLike, 
scala.Function1, scala.collection.generic.CanBuildFrom) @bci=17, line=244 
(Compiled frame)
 - scala.collection.mutable.ArrayOps$ofRef.map(scala.Function1, 
scala.collection.generic.CanBuildFrom) @bci=3, line=108 (Interpreted frame)
 - 
org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache.listLeafFiles(java.lang.String[])
 @bci=279, line=447 (Interpreted frame)
 - org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache.refresh() 
@bci=8, line=453 (Interpreted frame)
 - 
org.apache.spark.sql.sources.HadoopFsRelation.org$apache$spark$sql$sources$HadoopFsRelation$$fileStatusCache$lzycompute()
 @bci=26, line=465 (Interpreted frame)
 - 
org.apache.spark.sql.sources.HadoopFsRelation.org$apache$spark$sql$sources$HadoopFsRelation$$fileStatusCache()
 @bci=12, line=463 (Interpreted frame)
 - org.apache.spark.sql.sources.HadoopFsRelation.refresh() @bci=1, line=540 
(Interpreted frame)
 - org.apache.spark.sql.execution.datasources.parquet.ParquetRelation.refresh() 
@bci=1, line=204 (Interpreted frame)
 - 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp()
 @bci=392, line=152 (Interpreted frame)
 - 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply()
 @bci=1, line=108 (Interpreted frame)
 - 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply()
 @bci=1, line=108 (Interpreted frame)
 - 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(org.apache.spark.sql.SQLContext,
 org.apache.spark.sql.SQLContext$QueryExecution, scala.Function0) @bci=96, 
line=56 (Interpreted frame)
 - 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(org.apache.spark.sql.SQLContext)
 @bci=718, line=108 (Interpreted frame)
 - org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute() 
@bci=20, line=57 (Interpreted frame)
 - org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult() @bci=15, 
line=57 (Interpreted frame)
 - org.apache.spark.sql.execution.ExecutedCommand.doExecute() @bci=12, line=69 
(Interpreted frame)
 - org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply() @bci=11, 
line=140 (Interpreted frame)
 - org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply() @bci=1, 
line=138 (Interpreted frame)
 - 
org.apache.spark.rdd.RDDOperationScope$.withScope(org.apache.spark.SparkContext,
 java.lang.String, boolean, boolean, scala.Function0) @bci=131, line=147 
(Interpreted frame)
 - org.apache.spark.sql.execution.SparkPlan.execute() @bci=189, line=138 
(Interpreted frame)
 - org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute() @bci=21, 
line=933 (Interpreted frame)
 - org.apache.spark.sql.SQLContext$QueryExecution.toRdd() @bci=13, line=933 
(Interpreted frame)
 - 
org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(org.apache.spark.sql.SQLContext,
 java.lang.String, java.lang.String[], org.apache.spark.sql.SaveMode, 
scala.collection.immutable.Map, org.apache.spark.sql.DataFrame) @bci=293, 
line=197 (Interpreted frame)
 - org.apache.spark.sql.DataFrameWriter.save() @bci=64, line=146 (Interpreted 
frame)
 - org.apache.spark.sql.DataFrameWriter.save(java.lang.String) @bci=24, 
line=137 (Interpreted frame)
 - org.apache.spark.sql.DataFrameWriter.parquet(java.lang.String) @bci=8, 
line=304 (Interpreted frame)

> Reduce memory consumption for dynamic partition insert
> ------------------------------------------------------
>
>                 Key: SPARK-8890
>                 URL: https://issues.apache.org/jira/browse/SPARK-8890
>             Project: Spark
>          Issue Type: Sub-task
>          Components: SQL
>            Reporter: Reynold Xin
>            Assignee: Michael Armbrust
>            Priority: Critical
>             Fix For: 1.5.0
>
>
> Currently, InsertIntoHadoopFsRelation can run out of memory if the number of 
> table partitions is large. The problem is that we open one output writer for 
> each partition, and when data are randomized and when the number of 
> partitions is large, we open a large number of output writers, leading to OOM.
> The solution here is to inject a sorting operation once the number of active 
> partitions is beyond a certain point (e.g. 50?)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to