[jira] [Commented] (SPARK-22618) RDD.unpersist can cause fatal exception when used with dynamic allocation

2018-03-29 Thread Brad (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22618?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16419062#comment-16419062
 ] 

Brad commented on SPARK-22618:
--

Yeah the fix for broadcaset unpersist should be basically the same. Thanks 
Thomas.

> RDD.unpersist can cause fatal exception when used with dynamic allocation
> -
>
> Key: SPARK-22618
> URL: https://issues.apache.org/jira/browse/SPARK-22618
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0
>Reporter: Brad
>Assignee: Brad
>Priority: Minor
> Fix For: 2.3.0
>
>
> If you use rdd.unpersist() with dynamic allocation, then an executor can be 
> deallocated while your rdd is being removed, which will throw an uncaught 
> exception killing your job. 
> I looked into different ways of preventing this error from occurring but 
> couldn't come up with anything that wouldn't require a big change. I propose 
> the best fix is just to catch and log IOExceptions in unpersist() so they 
> don't kill your job. This will match the effective behavior when executors 
> are lost from dynamic allocation in other parts of the code.
> In the worst case scenario I think this could lead to RDD partitions getting 
> left on executors after they were unpersisted, but this is probably better 
> than the whole job failing. I think in most cases the IOException would be 
> due to the executor dieing for some reason, which is effectively the same 
> result as unpersisting the rdd from that executor anyway.
> I noticed this exception in a job that loads a 100GB dataset on a cluster 
> where we use dynamic allocation heavily. Here is the relevant stack trace
> java.io.IOException: Connection reset by peer
> at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
> at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
> at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
> at sun.nio.ch.IOUtil.read(IOUtil.java:192)
> at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
> at 
> io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:221)
> at 
> io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:899)
> at 
> io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:276)
> at 
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)
> at 
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
> at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
> at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
> at 
> io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
> at java.lang.Thread.run(Thread.java:748)
> Exception in thread "main" org.apache.spark.SparkException: Exception thrown 
> in awaitResult:
> at 
> org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)
> at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
> at 
> org.apache.spark.storage.BlockManagerMaster.removeRdd(BlockManagerMaster.scala:131)
> at org.apache.spark.SparkContext.unpersistRDD(SparkContext.scala:1806)
> at org.apache.spark.rdd.RDD.unpersist(RDD.scala:217)
> at 
> com.ibm.sparktc.sparkbench.workload.exercise.CacheTest.doWorkload(CacheTest.scala:62)
> at 
> com.ibm.sparktc.sparkbench.workload.Workload$class.run(Workload.scala:40)
> at 
> com.ibm.sparktc.sparkbench.workload.exercise.CacheTest.run(CacheTest.scala:33)
> at 
> com.ibm.sparktc.sparkbench.workload.SuiteKickoff$$anonfun$com$ibm$sparktc$sparkbench$workload$SuiteKickoff$$runSerially$1.apply(SuiteKickoff.scala:78)
> at 
> com.ibm.sparktc.sparkbench.workload.SuiteKickoff$$anonfun$com$ibm$sparktc$sparkbench$workload$SuiteKickoff$$runSerially$1.apply(SuiteKickoff.scala:78)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.immutable.List.foreach(List.scala:381)
> at 
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.immutable.List.map(List.scala:285)
> at 
> 

[jira] [Commented] (SPARK-22618) RDD.unpersist can cause fatal exception when used with dynamic allocation

2018-03-28 Thread Thomas Graves (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22618?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16417466#comment-16417466
 ] 

Thomas Graves commented on SPARK-22618:
---

I'll file a separate Jira for it and put up a pr

> RDD.unpersist can cause fatal exception when used with dynamic allocation
> -
>
> Key: SPARK-22618
> URL: https://issues.apache.org/jira/browse/SPARK-22618
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0
>Reporter: Brad
>Assignee: Brad
>Priority: Minor
> Fix For: 2.3.0
>
>
> If you use rdd.unpersist() with dynamic allocation, then an executor can be 
> deallocated while your rdd is being removed, which will throw an uncaught 
> exception killing your job. 
> I looked into different ways of preventing this error from occurring but 
> couldn't come up with anything that wouldn't require a big change. I propose 
> the best fix is just to catch and log IOExceptions in unpersist() so they 
> don't kill your job. This will match the effective behavior when executors 
> are lost from dynamic allocation in other parts of the code.
> In the worst case scenario I think this could lead to RDD partitions getting 
> left on executors after they were unpersisted, but this is probably better 
> than the whole job failing. I think in most cases the IOException would be 
> due to the executor dieing for some reason, which is effectively the same 
> result as unpersisting the rdd from that executor anyway.
> I noticed this exception in a job that loads a 100GB dataset on a cluster 
> where we use dynamic allocation heavily. Here is the relevant stack trace
> java.io.IOException: Connection reset by peer
> at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
> at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
> at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
> at sun.nio.ch.IOUtil.read(IOUtil.java:192)
> at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
> at 
> io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:221)
> at 
> io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:899)
> at 
> io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:276)
> at 
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)
> at 
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
> at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
> at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
> at 
> io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
> at java.lang.Thread.run(Thread.java:748)
> Exception in thread "main" org.apache.spark.SparkException: Exception thrown 
> in awaitResult:
> at 
> org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)
> at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
> at 
> org.apache.spark.storage.BlockManagerMaster.removeRdd(BlockManagerMaster.scala:131)
> at org.apache.spark.SparkContext.unpersistRDD(SparkContext.scala:1806)
> at org.apache.spark.rdd.RDD.unpersist(RDD.scala:217)
> at 
> com.ibm.sparktc.sparkbench.workload.exercise.CacheTest.doWorkload(CacheTest.scala:62)
> at 
> com.ibm.sparktc.sparkbench.workload.Workload$class.run(Workload.scala:40)
> at 
> com.ibm.sparktc.sparkbench.workload.exercise.CacheTest.run(CacheTest.scala:33)
> at 
> com.ibm.sparktc.sparkbench.workload.SuiteKickoff$$anonfun$com$ibm$sparktc$sparkbench$workload$SuiteKickoff$$runSerially$1.apply(SuiteKickoff.scala:78)
> at 
> com.ibm.sparktc.sparkbench.workload.SuiteKickoff$$anonfun$com$ibm$sparktc$sparkbench$workload$SuiteKickoff$$runSerially$1.apply(SuiteKickoff.scala:78)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.immutable.List.foreach(List.scala:381)
> at 
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.immutable.List.map(List.scala:285)
> at 
> com.ibm.sparktc.sparkbench.workload.SuiteKickoff$.com$ibm$sparktc$sparkbench$workload$SuiteKickoff$$runSerially(SuiteKickoff.scala:78)
>  

[jira] [Commented] (SPARK-22618) RDD.unpersist can cause fatal exception when used with dynamic allocation

2018-03-27 Thread Wenchen Fan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22618?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16416821#comment-16416821
 ] 

Wenchen Fan commented on SPARK-22618:
-

Looks like we can apply the same fix to `Broadcast.unpersist`. Do you want to 
send a PR to fix? thanks!

> RDD.unpersist can cause fatal exception when used with dynamic allocation
> -
>
> Key: SPARK-22618
> URL: https://issues.apache.org/jira/browse/SPARK-22618
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0
>Reporter: Brad
>Assignee: Brad
>Priority: Minor
> Fix For: 2.3.0
>
>
> If you use rdd.unpersist() with dynamic allocation, then an executor can be 
> deallocated while your rdd is being removed, which will throw an uncaught 
> exception killing your job. 
> I looked into different ways of preventing this error from occurring but 
> couldn't come up with anything that wouldn't require a big change. I propose 
> the best fix is just to catch and log IOExceptions in unpersist() so they 
> don't kill your job. This will match the effective behavior when executors 
> are lost from dynamic allocation in other parts of the code.
> In the worst case scenario I think this could lead to RDD partitions getting 
> left on executors after they were unpersisted, but this is probably better 
> than the whole job failing. I think in most cases the IOException would be 
> due to the executor dieing for some reason, which is effectively the same 
> result as unpersisting the rdd from that executor anyway.
> I noticed this exception in a job that loads a 100GB dataset on a cluster 
> where we use dynamic allocation heavily. Here is the relevant stack trace
> java.io.IOException: Connection reset by peer
> at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
> at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
> at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
> at sun.nio.ch.IOUtil.read(IOUtil.java:192)
> at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
> at 
> io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:221)
> at 
> io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:899)
> at 
> io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:276)
> at 
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)
> at 
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
> at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
> at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
> at 
> io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
> at java.lang.Thread.run(Thread.java:748)
> Exception in thread "main" org.apache.spark.SparkException: Exception thrown 
> in awaitResult:
> at 
> org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)
> at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
> at 
> org.apache.spark.storage.BlockManagerMaster.removeRdd(BlockManagerMaster.scala:131)
> at org.apache.spark.SparkContext.unpersistRDD(SparkContext.scala:1806)
> at org.apache.spark.rdd.RDD.unpersist(RDD.scala:217)
> at 
> com.ibm.sparktc.sparkbench.workload.exercise.CacheTest.doWorkload(CacheTest.scala:62)
> at 
> com.ibm.sparktc.sparkbench.workload.Workload$class.run(Workload.scala:40)
> at 
> com.ibm.sparktc.sparkbench.workload.exercise.CacheTest.run(CacheTest.scala:33)
> at 
> com.ibm.sparktc.sparkbench.workload.SuiteKickoff$$anonfun$com$ibm$sparktc$sparkbench$workload$SuiteKickoff$$runSerially$1.apply(SuiteKickoff.scala:78)
> at 
> com.ibm.sparktc.sparkbench.workload.SuiteKickoff$$anonfun$com$ibm$sparktc$sparkbench$workload$SuiteKickoff$$runSerially$1.apply(SuiteKickoff.scala:78)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.immutable.List.foreach(List.scala:381)
> at 
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.immutable.List.map(List.scala:285)
> at 
> 

[jira] [Commented] (SPARK-22618) RDD.unpersist can cause fatal exception when used with dynamic allocation

2018-03-27 Thread Thomas Graves (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22618?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16416309#comment-16416309
 ] 

Thomas Graves commented on SPARK-22618:
---

thanks for fixing this, hitting it now in spark 2.2, I think this same issue 
can happen with broadcast variables if its told to wait, did you happen to look 
at that at the same time?  

> RDD.unpersist can cause fatal exception when used with dynamic allocation
> -
>
> Key: SPARK-22618
> URL: https://issues.apache.org/jira/browse/SPARK-22618
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0
>Reporter: Brad
>Assignee: Brad
>Priority: Minor
> Fix For: 2.3.0
>
>
> If you use rdd.unpersist() with dynamic allocation, then an executor can be 
> deallocated while your rdd is being removed, which will throw an uncaught 
> exception killing your job. 
> I looked into different ways of preventing this error from occurring but 
> couldn't come up with anything that wouldn't require a big change. I propose 
> the best fix is just to catch and log IOExceptions in unpersist() so they 
> don't kill your job. This will match the effective behavior when executors 
> are lost from dynamic allocation in other parts of the code.
> In the worst case scenario I think this could lead to RDD partitions getting 
> left on executors after they were unpersisted, but this is probably better 
> than the whole job failing. I think in most cases the IOException would be 
> due to the executor dieing for some reason, which is effectively the same 
> result as unpersisting the rdd from that executor anyway.
> I noticed this exception in a job that loads a 100GB dataset on a cluster 
> where we use dynamic allocation heavily. Here is the relevant stack trace
> java.io.IOException: Connection reset by peer
> at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
> at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
> at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
> at sun.nio.ch.IOUtil.read(IOUtil.java:192)
> at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
> at 
> io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:221)
> at 
> io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:899)
> at 
> io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:276)
> at 
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)
> at 
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
> at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
> at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
> at 
> io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
> at java.lang.Thread.run(Thread.java:748)
> Exception in thread "main" org.apache.spark.SparkException: Exception thrown 
> in awaitResult:
> at 
> org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)
> at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
> at 
> org.apache.spark.storage.BlockManagerMaster.removeRdd(BlockManagerMaster.scala:131)
> at org.apache.spark.SparkContext.unpersistRDD(SparkContext.scala:1806)
> at org.apache.spark.rdd.RDD.unpersist(RDD.scala:217)
> at 
> com.ibm.sparktc.sparkbench.workload.exercise.CacheTest.doWorkload(CacheTest.scala:62)
> at 
> com.ibm.sparktc.sparkbench.workload.Workload$class.run(Workload.scala:40)
> at 
> com.ibm.sparktc.sparkbench.workload.exercise.CacheTest.run(CacheTest.scala:33)
> at 
> com.ibm.sparktc.sparkbench.workload.SuiteKickoff$$anonfun$com$ibm$sparktc$sparkbench$workload$SuiteKickoff$$runSerially$1.apply(SuiteKickoff.scala:78)
> at 
> com.ibm.sparktc.sparkbench.workload.SuiteKickoff$$anonfun$com$ibm$sparktc$sparkbench$workload$SuiteKickoff$$runSerially$1.apply(SuiteKickoff.scala:78)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.immutable.List.foreach(List.scala:381)
> at 
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.immutable.List.map(List.scala:285)
> at 
> 

[jira] [Commented] (SPARK-22618) RDD.unpersist can cause fatal exception when used with dynamic allocation

2017-11-28 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22618?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16269374#comment-16269374
 ] 

Apache Spark commented on SPARK-22618:
--

User 'brad-kaiser' has created a pull request for this issue:
https://github.com/apache/spark/pull/19836

> RDD.unpersist can cause fatal exception when used with dynamic allocation
> -
>
> Key: SPARK-22618
> URL: https://issues.apache.org/jira/browse/SPARK-22618
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0
>Reporter: Brad
>Priority: Minor
>
> If you use rdd.unpersist() with dynamic allocation, then an executor can be 
> deallocated while your rdd is being removed, which will throw an uncaught 
> exception killing your job. 
> I looked into different ways of preventing this error from occurring but 
> couldn't come up with anything that wouldn't require a big change. I propose 
> the best fix is just to catch and log IOExceptions in unpersist() so they 
> don't kill your job. This will match the effective behavior when executors 
> are lost from dynamic allocation in other parts of the code.
> In the worst case scenario I think this could lead to RDD partitions getting 
> left on executors after they were unpersisted, but this is probably better 
> than the whole job failing. I think in most cases the IOException would be 
> due to the executor dieing for some reason, which is effectively the same 
> result as unpersisting the rdd from that executor anyway.
> I noticed this exception in a job that loads a 100GB dataset on a cluster 
> where we use dynamic allocation heavily. Here is the relevant stack trace
> java.io.IOException: Connection reset by peer
> at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
> at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
> at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
> at sun.nio.ch.IOUtil.read(IOUtil.java:192)
> at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
> at 
> io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:221)
> at 
> io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:899)
> at 
> io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:276)
> at 
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)
> at 
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
> at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
> at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
> at 
> io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
> at java.lang.Thread.run(Thread.java:748)
> Exception in thread "main" org.apache.spark.SparkException: Exception thrown 
> in awaitResult:
> at 
> org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)
> at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
> at 
> org.apache.spark.storage.BlockManagerMaster.removeRdd(BlockManagerMaster.scala:131)
> at org.apache.spark.SparkContext.unpersistRDD(SparkContext.scala:1806)
> at org.apache.spark.rdd.RDD.unpersist(RDD.scala:217)
> at 
> com.ibm.sparktc.sparkbench.workload.exercise.CacheTest.doWorkload(CacheTest.scala:62)
> at 
> com.ibm.sparktc.sparkbench.workload.Workload$class.run(Workload.scala:40)
> at 
> com.ibm.sparktc.sparkbench.workload.exercise.CacheTest.run(CacheTest.scala:33)
> at 
> com.ibm.sparktc.sparkbench.workload.SuiteKickoff$$anonfun$com$ibm$sparktc$sparkbench$workload$SuiteKickoff$$runSerially$1.apply(SuiteKickoff.scala:78)
> at 
> com.ibm.sparktc.sparkbench.workload.SuiteKickoff$$anonfun$com$ibm$sparktc$sparkbench$workload$SuiteKickoff$$runSerially$1.apply(SuiteKickoff.scala:78)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.immutable.List.foreach(List.scala:381)
> at 
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.immutable.List.map(List.scala:285)
> at 
> com.ibm.sparktc.sparkbench.workload.SuiteKickoff$.com$ibm$sparktc$sparkbench$workload$SuiteKickoff$$runSerially(SuiteKickoff.scala:78)
> 

[jira] [Commented] (SPARK-22618) RDD.unpersist can cause fatal exception when used with dynamic allocation

2017-11-27 Thread Brad (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22618?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16267223#comment-16267223
 ] 

Brad commented on SPARK-22618:
--

I have a PR for this forthcoming.

> RDD.unpersist can cause fatal exception when used with dynamic allocation
> -
>
> Key: SPARK-22618
> URL: https://issues.apache.org/jira/browse/SPARK-22618
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0
>Reporter: Brad
>Priority: Minor
> Fix For: 2.3.0
>
>
> If you use rdd.unpersist() with dynamic allocation, then an executor can be 
> deallocated while your rdd is being removed, which will throw an uncaught 
> exception killing your job. 
> I looked into different ways of preventing this error from occurring but 
> couldn't come up with anything that wouldn't require a big change. I propose 
> the best fix is just to catch and log IOExceptions in unpersist() so they 
> don't kill your job. This will match the effective behavior when executors 
> are lost from dynamic allocation in other parts of the code.
> In the worst case scenario I think this could lead to RDD partitions getting 
> left on executors after they were unpersisted, but this is probably better 
> than the whole job failing. I think in most cases the IOException would be 
> due to the executor dieing for some reason, which is effectively the same 
> result as unpersisting the rdd from that executor anyway.
> I noticed this exception in a job that loads a 100GB dataset on a cluster 
> where we use dynamic allocation heavily. Here is the relevant stack trace
> java.io.IOException: Connection reset by peer
> at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
> at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
> at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
> at sun.nio.ch.IOUtil.read(IOUtil.java:192)
> at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
> at 
> io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:221)
> at 
> io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:899)
> at 
> io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:276)
> at 
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)
> at 
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
> at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
> at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
> at 
> io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
> at java.lang.Thread.run(Thread.java:748)
> Exception in thread "main" org.apache.spark.SparkException: Exception thrown 
> in awaitResult:
> at 
> org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)
> at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
> at 
> org.apache.spark.storage.BlockManagerMaster.removeRdd(BlockManagerMaster.scala:131)
> at org.apache.spark.SparkContext.unpersistRDD(SparkContext.scala:1806)
> at org.apache.spark.rdd.RDD.unpersist(RDD.scala:217)
> at 
> com.ibm.sparktc.sparkbench.workload.exercise.CacheTest.doWorkload(CacheTest.scala:62)
> at 
> com.ibm.sparktc.sparkbench.workload.Workload$class.run(Workload.scala:40)
> at 
> com.ibm.sparktc.sparkbench.workload.exercise.CacheTest.run(CacheTest.scala:33)
> at 
> com.ibm.sparktc.sparkbench.workload.SuiteKickoff$$anonfun$com$ibm$sparktc$sparkbench$workload$SuiteKickoff$$runSerially$1.apply(SuiteKickoff.scala:78)
> at 
> com.ibm.sparktc.sparkbench.workload.SuiteKickoff$$anonfun$com$ibm$sparktc$sparkbench$workload$SuiteKickoff$$runSerially$1.apply(SuiteKickoff.scala:78)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.immutable.List.foreach(List.scala:381)
> at 
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.immutable.List.map(List.scala:285)
> at 
> com.ibm.sparktc.sparkbench.workload.SuiteKickoff$.com$ibm$sparktc$sparkbench$workload$SuiteKickoff$$runSerially(SuiteKickoff.scala:78)
> at 
>