[ 
https://issues.apache.org/jira/browse/SPARK-17503?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sean Zhong updated SPARK-17503:
-------------------------------
    Description: 
h2.Problem description:

The following query triggers out of memory error.  

{code}
sc.parallelize(1 to 10000000, 5).map(new Array[Long](1000)).cache().count
{code}

This is not expected, we should fallback to use disk instead if there is not 
enough memory for cache.

Stacktrace:
{code}
scala> sc.parallelize(1 to 10000000, 5).map(f).cache().count
[Stage 0:>                                                          (0 + 5) / 
5]16/09/11 17:27:20 WARN MemoryStore: Not enough space to cache rdd_1_4 in 
memory! (computed 631.5 MB so far)
16/09/11 17:27:20 WARN MemoryStore: Not enough space to cache rdd_1_0 in 
memory! (computed 631.5 MB so far)
16/09/11 17:27:20 WARN BlockManager: Putting block rdd_1_0 failed
16/09/11 17:27:20 WARN BlockManager: Putting block rdd_1_4 failed
16/09/11 17:27:21 WARN MemoryStore: Not enough space to cache rdd_1_1 in 
memory! (computed 947.3 MB so far)
16/09/11 17:27:21 WARN BlockManager: Putting block rdd_1_1 failed
16/09/11 17:27:22 WARN MemoryStore: Not enough space to cache rdd_1_3 in 
memory! (computed 1423.7 MB so far)
16/09/11 17:27:22 WARN BlockManager: Putting block rdd_1_3 failed

java.lang.OutOfMemoryError: Java heap space
Dumping heap to java_pid26528.hprof ...
Heap dump file created [6551021666 bytes in 9.876 secs]
16/09/11 17:28:15 WARN NettyRpcEnv: Ignored message: HeartbeatResponse(false)
16/09/11 17:28:15 WARN NettyRpcEndpointRef: Error sending message [message = 
Heartbeat(driver,[Lscala.Tuple2;@46c9ce96,BlockManagerId(driver, 127.0.0.1, 
55360))] in 1 attempts
org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [10 seconds]. 
This timeout is controlled by spark.executor.heartbeatInterval
        at 
org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48)
        at 
org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:63)
        at 
org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
        at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
        at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:83)
        at 
org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:102)
        at 
org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:523)
        at 
org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply$mcV$sp(Executor.scala:552)
        at 
org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:552)
        at 
org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:552)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1857)
        at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:552)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [10 
seconds]
        at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
        at 
scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
        at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
        at 
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
        at scala.concurrent.Await$.result(package.scala:190)
        at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:81)
        ... 14 more
16/09/11 17:28:15 ERROR Executor: Exception in task 3.0 in stage 0.0 (TID 3)
java.lang.OutOfMemoryError: Java heap space
        at 
$line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:24)
        at 
$line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:23)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
        at scala.collection.Iterator$JoinIterator.next(Iterator.scala:232)
        at 
org.apache.spark.storage.memory.PartiallyUnrolledIterator.next(MemoryStore.scala:683)
        at 
org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
        at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1684)
        at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1134)
        at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1134)
        at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1915)
        at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1915)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
        at org.apache.spark.scheduler.Task.run(Task.scala:86)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
16/09/11 17:28:15 ERROR Executor: Exception in task 4.0 in stage 0.0 (TID 4)
java.lang.OutOfMemoryError: Java heap space
        at 
$line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:24)
        at 
$line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:23)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
        at scala.collection.Iterator$JoinIterator.next(Iterator.scala:232)
        at 
org.apache.spark.storage.memory.PartiallyUnrolledIterator.next(MemoryStore.scala:683)
        at 
org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
        at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1684)
        at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1134)
        at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1134)
        at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1915)
        at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1915)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
        at org.apache.spark.scheduler.Task.run(Task.scala:86)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
16/09/11 17:28:15 ERROR SparkUncaughtExceptionHandler: Uncaught exception in 
thread Thread[Executor task launch worker-3,5,main]
java.lang.OutOfMemoryError: Java heap space
        at 
$line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:24)
        at 
$line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:23)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
        at scala.collection.Iterator$JoinIterator.next(Iterator.scala:232)
        at 
org.apache.spark.storage.memory.PartiallyUnrolledIterator.next(MemoryStore.scala:683)
        at 
org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
        at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1684)
        at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1134)
        at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1134)
        at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1915)
        at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1915)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
        at org.apache.spark.scheduler.Task.run(Task.scala:86)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
16/09/11 17:28:15 ERROR SparkUncaughtExceptionHandler: Uncaught exception in 
thread Thread[Executor task launch worker-4,5,main]
java.lang.OutOfMemoryError: Java heap space
        at 
$line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:24)
        at 
$line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:23)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
        at scala.collection.Iterator$JoinIterator.next(Iterator.scala:232)
        at 
org.apache.spark.storage.memory.PartiallyUnrolledIterator.next(MemoryStore.scala:683)
        at 
org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
        at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1684)
        at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1134)
        at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1134)
        at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1915)
        at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1915)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
        at org.apache.spark.scheduler.Task.run(Task.scala:86)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
16/09/11 17:28:15 WARN TaskSetManager: Lost task 4.0 in stage 0.0 (TID 4, 
localhost): java.lang.OutOfMemoryError: Java heap space
        at 
$line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:24)
        at 
$line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:23)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
        at scala.collection.Iterator$JoinIterator.next(Iterator.scala:232)
        at 
org.apache.spark.storage.memory.PartiallyUnrolledIterator.next(MemoryStore.scala:683)
        at 
org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
        at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1684)
        at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1134)
        at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1134)
        at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1915)
        at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1915)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
        at org.apache.spark.scheduler.Task.run(Task.scala:86)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
{code}

h2.Analysis:

When the RDD is too big to cache, Spark returns a PartiallyUnrolledIterator.
{code}
   // line 287, in file MemoryStore.scala 

    } else {
      // We ran out of space while unrolling the values for this block
      logUnrollFailureMessage(blockId, vector.estimateSize())
      Left(new PartiallyUnrolledIterator(
        this, unrollMemoryUsedByThisBlock, unrolled = vector.iterator, rest = 
values))
    }
{code}

Parameter 'unrolled' points to a vector array buffer, which stores all input 
values we have read so far when trying to cache the RDD. Parameter 'rest' is a 
iterator over all unread input values.

For example, if the input RDD partition has 100GB bytes, and Spark executor has 
a 10GB cache, then parameter 'unrolled' will points to a array of 10GB bytes, 
the parameter 'rest' iterator points to unread 90GB input data.

We expect the 10GB 'unrolled' memory to be garbage collected immediately after 
all values in 'unrolled' have been consumed. But current Spark code will not 
collect the 10GB 'unrolled' until all 100GB input data has been processed.  

  was:
h2.Problem description:

The following query triggers out of memory error.  

{code}
sc.parallelize(1 to 10000000, 5).map(new Array[Long](1000)).cache().count
{code}

This is not expected, we should fallback to use disk instead if there is not 
enough memory for cache.

Stacktrace:
{code}
scala> sc.parallelize(1 to 10000000, 5).map(f).cache().count
[Stage 0:>                                                          (0 + 5) / 
5]16/09/11 17:27:20 WARN MemoryStore: Not enough space to cache rdd_1_4 in 
memory! (computed 631.5 MB so far)
16/09/11 17:27:20 WARN MemoryStore: Not enough space to cache rdd_1_0 in 
memory! (computed 631.5 MB so far)
16/09/11 17:27:20 WARN BlockManager: Putting block rdd_1_0 failed
16/09/11 17:27:20 WARN BlockManager: Putting block rdd_1_4 failed
16/09/11 17:27:21 WARN MemoryStore: Not enough space to cache rdd_1_1 in 
memory! (computed 947.3 MB so far)
16/09/11 17:27:21 WARN BlockManager: Putting block rdd_1_1 failed
16/09/11 17:27:22 WARN MemoryStore: Not enough space to cache rdd_1_3 in 
memory! (computed 1423.7 MB so far)
16/09/11 17:27:22 WARN BlockManager: Putting block rdd_1_3 failed

java.lang.OutOfMemoryError: Java heap space
Dumping heap to java_pid26528.hprof ...
Heap dump file created [6551021666 bytes in 9.876 secs]
16/09/11 17:28:15 WARN NettyRpcEnv: Ignored message: HeartbeatResponse(false)
16/09/11 17:28:15 WARN NettyRpcEndpointRef: Error sending message [message = 
Heartbeat(driver,[Lscala.Tuple2;@46c9ce96,BlockManagerId(driver, 127.0.0.1, 
55360))] in 1 attempts
org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [10 seconds]. 
This timeout is controlled by spark.executor.heartbeatInterval
        at 
org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48)
        at 
org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:63)
        at 
org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
        at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
        at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:83)
        at 
org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:102)
        at 
org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:523)
        at 
org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply$mcV$sp(Executor.scala:552)
        at 
org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:552)
        at 
org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:552)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1857)
        at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:552)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [10 
seconds]
        at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
        at 
scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
        at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
        at 
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
        at scala.concurrent.Await$.result(package.scala:190)
        at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:81)
        ... 14 more
16/09/11 17:28:15 ERROR Executor: Exception in task 3.0 in stage 0.0 (TID 3)
java.lang.OutOfMemoryError: Java heap space
        at 
$line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:24)
        at 
$line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:23)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
        at scala.collection.Iterator$JoinIterator.next(Iterator.scala:232)
        at 
org.apache.spark.storage.memory.PartiallyUnrolledIterator.next(MemoryStore.scala:683)
        at 
org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
        at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1684)
        at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1134)
        at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1134)
        at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1915)
        at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1915)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
        at org.apache.spark.scheduler.Task.run(Task.scala:86)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
16/09/11 17:28:15 ERROR Executor: Exception in task 4.0 in stage 0.0 (TID 4)
java.lang.OutOfMemoryError: Java heap space
        at 
$line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:24)
        at 
$line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:23)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
        at scala.collection.Iterator$JoinIterator.next(Iterator.scala:232)
        at 
org.apache.spark.storage.memory.PartiallyUnrolledIterator.next(MemoryStore.scala:683)
        at 
org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
        at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1684)
        at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1134)
        at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1134)
        at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1915)
        at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1915)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
        at org.apache.spark.scheduler.Task.run(Task.scala:86)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
16/09/11 17:28:15 ERROR SparkUncaughtExceptionHandler: Uncaught exception in 
thread Thread[Executor task launch worker-3,5,main]
java.lang.OutOfMemoryError: Java heap space
        at 
$line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:24)
        at 
$line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:23)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
        at scala.collection.Iterator$JoinIterator.next(Iterator.scala:232)
        at 
org.apache.spark.storage.memory.PartiallyUnrolledIterator.next(MemoryStore.scala:683)
        at 
org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
        at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1684)
        at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1134)
        at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1134)
        at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1915)
        at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1915)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
        at org.apache.spark.scheduler.Task.run(Task.scala:86)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
16/09/11 17:28:15 ERROR SparkUncaughtExceptionHandler: Uncaught exception in 
thread Thread[Executor task launch worker-4,5,main]
java.lang.OutOfMemoryError: Java heap space
        at 
$line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:24)
        at 
$line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:23)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
        at scala.collection.Iterator$JoinIterator.next(Iterator.scala:232)
        at 
org.apache.spark.storage.memory.PartiallyUnrolledIterator.next(MemoryStore.scala:683)
        at 
org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
        at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1684)
        at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1134)
        at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1134)
        at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1915)
        at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1915)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
        at org.apache.spark.scheduler.Task.run(Task.scala:86)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
16/09/11 17:28:15 WARN TaskSetManager: Lost task 4.0 in stage 0.0 (TID 4, 
localhost): java.lang.OutOfMemoryError: Java heap space
        at 
$line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:24)
        at 
$line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:23)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
        at scala.collection.Iterator$JoinIterator.next(Iterator.scala:232)
        at 
org.apache.spark.storage.memory.PartiallyUnrolledIterator.next(MemoryStore.scala:683)
        at 
org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
        at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1684)
        at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1134)
        at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1134)
        at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1915)
        at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1915)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
        at org.apache.spark.scheduler.Task.run(Task.scala:86)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
{code}

h2.Analysis:



> Memory leak in Memory store when unable to cache the whole RDD
> --------------------------------------------------------------
>
>                 Key: SPARK-17503
>                 URL: https://issues.apache.org/jira/browse/SPARK-17503
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 1.6.2, 2.0.0, 2.1.0
>            Reporter: Sean Zhong
>
> h2.Problem description:
> The following query triggers out of memory error.  
> {code}
> sc.parallelize(1 to 10000000, 5).map(new Array[Long](1000)).cache().count
> {code}
> This is not expected, we should fallback to use disk instead if there is not 
> enough memory for cache.
> Stacktrace:
> {code}
> scala> sc.parallelize(1 to 10000000, 5).map(f).cache().count
> [Stage 0:>                                                          (0 + 5) / 
> 5]16/09/11 17:27:20 WARN MemoryStore: Not enough space to cache rdd_1_4 in 
> memory! (computed 631.5 MB so far)
> 16/09/11 17:27:20 WARN MemoryStore: Not enough space to cache rdd_1_0 in 
> memory! (computed 631.5 MB so far)
> 16/09/11 17:27:20 WARN BlockManager: Putting block rdd_1_0 failed
> 16/09/11 17:27:20 WARN BlockManager: Putting block rdd_1_4 failed
> 16/09/11 17:27:21 WARN MemoryStore: Not enough space to cache rdd_1_1 in 
> memory! (computed 947.3 MB so far)
> 16/09/11 17:27:21 WARN BlockManager: Putting block rdd_1_1 failed
> 16/09/11 17:27:22 WARN MemoryStore: Not enough space to cache rdd_1_3 in 
> memory! (computed 1423.7 MB so far)
> 16/09/11 17:27:22 WARN BlockManager: Putting block rdd_1_3 failed
> java.lang.OutOfMemoryError: Java heap space
> Dumping heap to java_pid26528.hprof ...
> Heap dump file created [6551021666 bytes in 9.876 secs]
> 16/09/11 17:28:15 WARN NettyRpcEnv: Ignored message: HeartbeatResponse(false)
> 16/09/11 17:28:15 WARN NettyRpcEndpointRef: Error sending message [message = 
> Heartbeat(driver,[Lscala.Tuple2;@46c9ce96,BlockManagerId(driver, 127.0.0.1, 
> 55360))] in 1 attempts
> org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [10 
> seconds]. This timeout is controlled by spark.executor.heartbeatInterval
>       at 
> org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48)
>       at 
> org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:63)
>       at 
> org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
>       at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
>       at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:83)
>       at 
> org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:102)
>       at 
> org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:523)
>       at 
> org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply$mcV$sp(Executor.scala:552)
>       at 
> org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:552)
>       at 
> org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:552)
>       at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1857)
>       at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:552)
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>       at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> Caused by: java.util.concurrent.TimeoutException: Futures timed out after [10 
> seconds]
>       at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
>       at 
> scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
>       at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
>       at 
> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
>       at scala.concurrent.Await$.result(package.scala:190)
>       at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:81)
>       ... 14 more
> 16/09/11 17:28:15 ERROR Executor: Exception in task 3.0 in stage 0.0 (TID 3)
> java.lang.OutOfMemoryError: Java heap space
>       at 
> $line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:24)
>       at 
> $line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:23)
>       at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
>       at scala.collection.Iterator$JoinIterator.next(Iterator.scala:232)
>       at 
> org.apache.spark.storage.memory.PartiallyUnrolledIterator.next(MemoryStore.scala:683)
>       at 
> org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
>       at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1684)
>       at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1134)
>       at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1134)
>       at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1915)
>       at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1915)
>       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
>       at org.apache.spark.scheduler.Task.run(Task.scala:86)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> 16/09/11 17:28:15 ERROR Executor: Exception in task 4.0 in stage 0.0 (TID 4)
> java.lang.OutOfMemoryError: Java heap space
>       at 
> $line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:24)
>       at 
> $line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:23)
>       at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
>       at scala.collection.Iterator$JoinIterator.next(Iterator.scala:232)
>       at 
> org.apache.spark.storage.memory.PartiallyUnrolledIterator.next(MemoryStore.scala:683)
>       at 
> org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
>       at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1684)
>       at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1134)
>       at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1134)
>       at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1915)
>       at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1915)
>       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
>       at org.apache.spark.scheduler.Task.run(Task.scala:86)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> 16/09/11 17:28:15 ERROR SparkUncaughtExceptionHandler: Uncaught exception in 
> thread Thread[Executor task launch worker-3,5,main]
> java.lang.OutOfMemoryError: Java heap space
>       at 
> $line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:24)
>       at 
> $line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:23)
>       at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
>       at scala.collection.Iterator$JoinIterator.next(Iterator.scala:232)
>       at 
> org.apache.spark.storage.memory.PartiallyUnrolledIterator.next(MemoryStore.scala:683)
>       at 
> org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
>       at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1684)
>       at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1134)
>       at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1134)
>       at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1915)
>       at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1915)
>       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
>       at org.apache.spark.scheduler.Task.run(Task.scala:86)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> 16/09/11 17:28:15 ERROR SparkUncaughtExceptionHandler: Uncaught exception in 
> thread Thread[Executor task launch worker-4,5,main]
> java.lang.OutOfMemoryError: Java heap space
>       at 
> $line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:24)
>       at 
> $line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:23)
>       at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
>       at scala.collection.Iterator$JoinIterator.next(Iterator.scala:232)
>       at 
> org.apache.spark.storage.memory.PartiallyUnrolledIterator.next(MemoryStore.scala:683)
>       at 
> org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
>       at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1684)
>       at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1134)
>       at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1134)
>       at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1915)
>       at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1915)
>       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
>       at org.apache.spark.scheduler.Task.run(Task.scala:86)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> 16/09/11 17:28:15 WARN TaskSetManager: Lost task 4.0 in stage 0.0 (TID 4, 
> localhost): java.lang.OutOfMemoryError: Java heap space
>       at 
> $line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:24)
>       at 
> $line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:23)
>       at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
>       at scala.collection.Iterator$JoinIterator.next(Iterator.scala:232)
>       at 
> org.apache.spark.storage.memory.PartiallyUnrolledIterator.next(MemoryStore.scala:683)
>       at 
> org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
>       at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1684)
>       at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1134)
>       at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1134)
>       at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1915)
>       at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1915)
>       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
>       at org.apache.spark.scheduler.Task.run(Task.scala:86)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> {code}
> h2.Analysis:
> When the RDD is too big to cache, Spark returns a PartiallyUnrolledIterator.
> {code}
>    // line 287, in file MemoryStore.scala 
>     } else {
>       // We ran out of space while unrolling the values for this block
>       logUnrollFailureMessage(blockId, vector.estimateSize())
>       Left(new PartiallyUnrolledIterator(
>         this, unrollMemoryUsedByThisBlock, unrolled = vector.iterator, rest = 
> values))
>     }
> {code}
> Parameter 'unrolled' points to a vector array buffer, which stores all input 
> values we have read so far when trying to cache the RDD. Parameter 'rest' is 
> a iterator over all unread input values.
> For example, if the input RDD partition has 100GB bytes, and Spark executor 
> has a 10GB cache, then parameter 'unrolled' will points to a array of 10GB 
> bytes, the parameter 'rest' iterator points to unread 90GB input data.
> We expect the 10GB 'unrolled' memory to be garbage collected immediately 
> after all values in 'unrolled' have been consumed. But current Spark code 
> will not collect the 10GB 'unrolled' until all 100GB input data has been 
> processed.  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to