[jira] [Commented] (SPARK-22618) RDD.unpersist can cause fatal exception when used with dynamic allocation
[ https://issues.apache.org/jira/browse/SPARK-22618?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16419062#comment-16419062 ] Brad commented on SPARK-22618: -- Yeah the fix for broadcaset unpersist should be basically the same. Thanks Thomas. > RDD.unpersist can cause fatal exception when used with dynamic allocation > - > > Key: SPARK-22618 > URL: https://issues.apache.org/jira/browse/SPARK-22618 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.0 >Reporter: Brad >Assignee: Brad >Priority: Minor > Fix For: 2.3.0 > > > If you use rdd.unpersist() with dynamic allocation, then an executor can be > deallocated while your rdd is being removed, which will throw an uncaught > exception killing your job. > I looked into different ways of preventing this error from occurring but > couldn't come up with anything that wouldn't require a big change. I propose > the best fix is just to catch and log IOExceptions in unpersist() so they > don't kill your job. This will match the effective behavior when executors > are lost from dynamic allocation in other parts of the code. > In the worst case scenario I think this could lead to RDD partitions getting > left on executors after they were unpersisted, but this is probably better > than the whole job failing. I think in most cases the IOException would be > due to the executor dieing for some reason, which is effectively the same > result as unpersisting the rdd from that executor anyway. > I noticed this exception in a job that loads a 100GB dataset on a cluster > where we use dynamic allocation heavily. Here is the relevant stack trace > java.io.IOException: Connection reset by peer > at sun.nio.ch.FileDispatcherImpl.read0(Native Method) > at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) > at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) > at sun.nio.ch.IOUtil.read(IOUtil.java:192) > at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380) > at > io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:221) > at > io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:899) > at > io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:276) > at > io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119) > at > io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645) > at > io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580) > at > io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497) > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459) > at > io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131) > at > io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138) > at java.lang.Thread.run(Thread.java:748) > Exception in thread "main" org.apache.spark.SparkException: Exception thrown > in awaitResult: > at > org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205) > at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75) > at > org.apache.spark.storage.BlockManagerMaster.removeRdd(BlockManagerMaster.scala:131) > at org.apache.spark.SparkContext.unpersistRDD(SparkContext.scala:1806) > at org.apache.spark.rdd.RDD.unpersist(RDD.scala:217) > at > com.ibm.sparktc.sparkbench.workload.exercise.CacheTest.doWorkload(CacheTest.scala:62) > at > com.ibm.sparktc.sparkbench.workload.Workload$class.run(Workload.scala:40) > at > com.ibm.sparktc.sparkbench.workload.exercise.CacheTest.run(CacheTest.scala:33) > at > com.ibm.sparktc.sparkbench.workload.SuiteKickoff$$anonfun$com$ibm$sparktc$sparkbench$workload$SuiteKickoff$$runSerially$1.apply(SuiteKickoff.scala:78) > at > com.ibm.sparktc.sparkbench.workload.SuiteKickoff$$anonfun$com$ibm$sparktc$sparkbench$workload$SuiteKickoff$$runSerially$1.apply(SuiteKickoff.scala:78) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.immutable.List.foreach(List.scala:381) > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.immutable.List.map(List.scala:285) > at >
[jira] [Commented] (SPARK-22618) RDD.unpersist can cause fatal exception when used with dynamic allocation
[ https://issues.apache.org/jira/browse/SPARK-22618?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16417466#comment-16417466 ] Thomas Graves commented on SPARK-22618: --- I'll file a separate Jira for it and put up a pr > RDD.unpersist can cause fatal exception when used with dynamic allocation > - > > Key: SPARK-22618 > URL: https://issues.apache.org/jira/browse/SPARK-22618 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.0 >Reporter: Brad >Assignee: Brad >Priority: Minor > Fix For: 2.3.0 > > > If you use rdd.unpersist() with dynamic allocation, then an executor can be > deallocated while your rdd is being removed, which will throw an uncaught > exception killing your job. > I looked into different ways of preventing this error from occurring but > couldn't come up with anything that wouldn't require a big change. I propose > the best fix is just to catch and log IOExceptions in unpersist() so they > don't kill your job. This will match the effective behavior when executors > are lost from dynamic allocation in other parts of the code. > In the worst case scenario I think this could lead to RDD partitions getting > left on executors after they were unpersisted, but this is probably better > than the whole job failing. I think in most cases the IOException would be > due to the executor dieing for some reason, which is effectively the same > result as unpersisting the rdd from that executor anyway. > I noticed this exception in a job that loads a 100GB dataset on a cluster > where we use dynamic allocation heavily. Here is the relevant stack trace > java.io.IOException: Connection reset by peer > at sun.nio.ch.FileDispatcherImpl.read0(Native Method) > at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) > at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) > at sun.nio.ch.IOUtil.read(IOUtil.java:192) > at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380) > at > io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:221) > at > io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:899) > at > io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:276) > at > io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119) > at > io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645) > at > io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580) > at > io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497) > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459) > at > io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131) > at > io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138) > at java.lang.Thread.run(Thread.java:748) > Exception in thread "main" org.apache.spark.SparkException: Exception thrown > in awaitResult: > at > org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205) > at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75) > at > org.apache.spark.storage.BlockManagerMaster.removeRdd(BlockManagerMaster.scala:131) > at org.apache.spark.SparkContext.unpersistRDD(SparkContext.scala:1806) > at org.apache.spark.rdd.RDD.unpersist(RDD.scala:217) > at > com.ibm.sparktc.sparkbench.workload.exercise.CacheTest.doWorkload(CacheTest.scala:62) > at > com.ibm.sparktc.sparkbench.workload.Workload$class.run(Workload.scala:40) > at > com.ibm.sparktc.sparkbench.workload.exercise.CacheTest.run(CacheTest.scala:33) > at > com.ibm.sparktc.sparkbench.workload.SuiteKickoff$$anonfun$com$ibm$sparktc$sparkbench$workload$SuiteKickoff$$runSerially$1.apply(SuiteKickoff.scala:78) > at > com.ibm.sparktc.sparkbench.workload.SuiteKickoff$$anonfun$com$ibm$sparktc$sparkbench$workload$SuiteKickoff$$runSerially$1.apply(SuiteKickoff.scala:78) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.immutable.List.foreach(List.scala:381) > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.immutable.List.map(List.scala:285) > at > com.ibm.sparktc.sparkbench.workload.SuiteKickoff$.com$ibm$sparktc$sparkbench$workload$SuiteKickoff$$runSerially(SuiteKickoff.scala:78) >
[jira] [Commented] (SPARK-22618) RDD.unpersist can cause fatal exception when used with dynamic allocation
[ https://issues.apache.org/jira/browse/SPARK-22618?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16416821#comment-16416821 ] Wenchen Fan commented on SPARK-22618: - Looks like we can apply the same fix to `Broadcast.unpersist`. Do you want to send a PR to fix? thanks! > RDD.unpersist can cause fatal exception when used with dynamic allocation > - > > Key: SPARK-22618 > URL: https://issues.apache.org/jira/browse/SPARK-22618 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.0 >Reporter: Brad >Assignee: Brad >Priority: Minor > Fix For: 2.3.0 > > > If you use rdd.unpersist() with dynamic allocation, then an executor can be > deallocated while your rdd is being removed, which will throw an uncaught > exception killing your job. > I looked into different ways of preventing this error from occurring but > couldn't come up with anything that wouldn't require a big change. I propose > the best fix is just to catch and log IOExceptions in unpersist() so they > don't kill your job. This will match the effective behavior when executors > are lost from dynamic allocation in other parts of the code. > In the worst case scenario I think this could lead to RDD partitions getting > left on executors after they were unpersisted, but this is probably better > than the whole job failing. I think in most cases the IOException would be > due to the executor dieing for some reason, which is effectively the same > result as unpersisting the rdd from that executor anyway. > I noticed this exception in a job that loads a 100GB dataset on a cluster > where we use dynamic allocation heavily. Here is the relevant stack trace > java.io.IOException: Connection reset by peer > at sun.nio.ch.FileDispatcherImpl.read0(Native Method) > at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) > at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) > at sun.nio.ch.IOUtil.read(IOUtil.java:192) > at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380) > at > io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:221) > at > io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:899) > at > io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:276) > at > io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119) > at > io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645) > at > io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580) > at > io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497) > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459) > at > io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131) > at > io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138) > at java.lang.Thread.run(Thread.java:748) > Exception in thread "main" org.apache.spark.SparkException: Exception thrown > in awaitResult: > at > org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205) > at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75) > at > org.apache.spark.storage.BlockManagerMaster.removeRdd(BlockManagerMaster.scala:131) > at org.apache.spark.SparkContext.unpersistRDD(SparkContext.scala:1806) > at org.apache.spark.rdd.RDD.unpersist(RDD.scala:217) > at > com.ibm.sparktc.sparkbench.workload.exercise.CacheTest.doWorkload(CacheTest.scala:62) > at > com.ibm.sparktc.sparkbench.workload.Workload$class.run(Workload.scala:40) > at > com.ibm.sparktc.sparkbench.workload.exercise.CacheTest.run(CacheTest.scala:33) > at > com.ibm.sparktc.sparkbench.workload.SuiteKickoff$$anonfun$com$ibm$sparktc$sparkbench$workload$SuiteKickoff$$runSerially$1.apply(SuiteKickoff.scala:78) > at > com.ibm.sparktc.sparkbench.workload.SuiteKickoff$$anonfun$com$ibm$sparktc$sparkbench$workload$SuiteKickoff$$runSerially$1.apply(SuiteKickoff.scala:78) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.immutable.List.foreach(List.scala:381) > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.immutable.List.map(List.scala:285) > at >
[jira] [Commented] (SPARK-22618) RDD.unpersist can cause fatal exception when used with dynamic allocation
[ https://issues.apache.org/jira/browse/SPARK-22618?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16416309#comment-16416309 ] Thomas Graves commented on SPARK-22618: --- thanks for fixing this, hitting it now in spark 2.2, I think this same issue can happen with broadcast variables if its told to wait, did you happen to look at that at the same time? > RDD.unpersist can cause fatal exception when used with dynamic allocation > - > > Key: SPARK-22618 > URL: https://issues.apache.org/jira/browse/SPARK-22618 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.0 >Reporter: Brad >Assignee: Brad >Priority: Minor > Fix For: 2.3.0 > > > If you use rdd.unpersist() with dynamic allocation, then an executor can be > deallocated while your rdd is being removed, which will throw an uncaught > exception killing your job. > I looked into different ways of preventing this error from occurring but > couldn't come up with anything that wouldn't require a big change. I propose > the best fix is just to catch and log IOExceptions in unpersist() so they > don't kill your job. This will match the effective behavior when executors > are lost from dynamic allocation in other parts of the code. > In the worst case scenario I think this could lead to RDD partitions getting > left on executors after they were unpersisted, but this is probably better > than the whole job failing. I think in most cases the IOException would be > due to the executor dieing for some reason, which is effectively the same > result as unpersisting the rdd from that executor anyway. > I noticed this exception in a job that loads a 100GB dataset on a cluster > where we use dynamic allocation heavily. Here is the relevant stack trace > java.io.IOException: Connection reset by peer > at sun.nio.ch.FileDispatcherImpl.read0(Native Method) > at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) > at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) > at sun.nio.ch.IOUtil.read(IOUtil.java:192) > at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380) > at > io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:221) > at > io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:899) > at > io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:276) > at > io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119) > at > io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645) > at > io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580) > at > io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497) > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459) > at > io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131) > at > io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138) > at java.lang.Thread.run(Thread.java:748) > Exception in thread "main" org.apache.spark.SparkException: Exception thrown > in awaitResult: > at > org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205) > at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75) > at > org.apache.spark.storage.BlockManagerMaster.removeRdd(BlockManagerMaster.scala:131) > at org.apache.spark.SparkContext.unpersistRDD(SparkContext.scala:1806) > at org.apache.spark.rdd.RDD.unpersist(RDD.scala:217) > at > com.ibm.sparktc.sparkbench.workload.exercise.CacheTest.doWorkload(CacheTest.scala:62) > at > com.ibm.sparktc.sparkbench.workload.Workload$class.run(Workload.scala:40) > at > com.ibm.sparktc.sparkbench.workload.exercise.CacheTest.run(CacheTest.scala:33) > at > com.ibm.sparktc.sparkbench.workload.SuiteKickoff$$anonfun$com$ibm$sparktc$sparkbench$workload$SuiteKickoff$$runSerially$1.apply(SuiteKickoff.scala:78) > at > com.ibm.sparktc.sparkbench.workload.SuiteKickoff$$anonfun$com$ibm$sparktc$sparkbench$workload$SuiteKickoff$$runSerially$1.apply(SuiteKickoff.scala:78) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.immutable.List.foreach(List.scala:381) > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.immutable.List.map(List.scala:285) > at >
[jira] [Commented] (SPARK-22618) RDD.unpersist can cause fatal exception when used with dynamic allocation
[ https://issues.apache.org/jira/browse/SPARK-22618?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16269374#comment-16269374 ] Apache Spark commented on SPARK-22618: -- User 'brad-kaiser' has created a pull request for this issue: https://github.com/apache/spark/pull/19836 > RDD.unpersist can cause fatal exception when used with dynamic allocation > - > > Key: SPARK-22618 > URL: https://issues.apache.org/jira/browse/SPARK-22618 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.0 >Reporter: Brad >Priority: Minor > > If you use rdd.unpersist() with dynamic allocation, then an executor can be > deallocated while your rdd is being removed, which will throw an uncaught > exception killing your job. > I looked into different ways of preventing this error from occurring but > couldn't come up with anything that wouldn't require a big change. I propose > the best fix is just to catch and log IOExceptions in unpersist() so they > don't kill your job. This will match the effective behavior when executors > are lost from dynamic allocation in other parts of the code. > In the worst case scenario I think this could lead to RDD partitions getting > left on executors after they were unpersisted, but this is probably better > than the whole job failing. I think in most cases the IOException would be > due to the executor dieing for some reason, which is effectively the same > result as unpersisting the rdd from that executor anyway. > I noticed this exception in a job that loads a 100GB dataset on a cluster > where we use dynamic allocation heavily. Here is the relevant stack trace > java.io.IOException: Connection reset by peer > at sun.nio.ch.FileDispatcherImpl.read0(Native Method) > at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) > at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) > at sun.nio.ch.IOUtil.read(IOUtil.java:192) > at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380) > at > io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:221) > at > io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:899) > at > io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:276) > at > io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119) > at > io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645) > at > io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580) > at > io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497) > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459) > at > io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131) > at > io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138) > at java.lang.Thread.run(Thread.java:748) > Exception in thread "main" org.apache.spark.SparkException: Exception thrown > in awaitResult: > at > org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205) > at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75) > at > org.apache.spark.storage.BlockManagerMaster.removeRdd(BlockManagerMaster.scala:131) > at org.apache.spark.SparkContext.unpersistRDD(SparkContext.scala:1806) > at org.apache.spark.rdd.RDD.unpersist(RDD.scala:217) > at > com.ibm.sparktc.sparkbench.workload.exercise.CacheTest.doWorkload(CacheTest.scala:62) > at > com.ibm.sparktc.sparkbench.workload.Workload$class.run(Workload.scala:40) > at > com.ibm.sparktc.sparkbench.workload.exercise.CacheTest.run(CacheTest.scala:33) > at > com.ibm.sparktc.sparkbench.workload.SuiteKickoff$$anonfun$com$ibm$sparktc$sparkbench$workload$SuiteKickoff$$runSerially$1.apply(SuiteKickoff.scala:78) > at > com.ibm.sparktc.sparkbench.workload.SuiteKickoff$$anonfun$com$ibm$sparktc$sparkbench$workload$SuiteKickoff$$runSerially$1.apply(SuiteKickoff.scala:78) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.immutable.List.foreach(List.scala:381) > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.immutable.List.map(List.scala:285) > at > com.ibm.sparktc.sparkbench.workload.SuiteKickoff$.com$ibm$sparktc$sparkbench$workload$SuiteKickoff$$runSerially(SuiteKickoff.scala:78) >
[jira] [Commented] (SPARK-22618) RDD.unpersist can cause fatal exception when used with dynamic allocation
[ https://issues.apache.org/jira/browse/SPARK-22618?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16267223#comment-16267223 ] Brad commented on SPARK-22618: -- I have a PR for this forthcoming. > RDD.unpersist can cause fatal exception when used with dynamic allocation > - > > Key: SPARK-22618 > URL: https://issues.apache.org/jira/browse/SPARK-22618 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.0 >Reporter: Brad >Priority: Minor > Fix For: 2.3.0 > > > If you use rdd.unpersist() with dynamic allocation, then an executor can be > deallocated while your rdd is being removed, which will throw an uncaught > exception killing your job. > I looked into different ways of preventing this error from occurring but > couldn't come up with anything that wouldn't require a big change. I propose > the best fix is just to catch and log IOExceptions in unpersist() so they > don't kill your job. This will match the effective behavior when executors > are lost from dynamic allocation in other parts of the code. > In the worst case scenario I think this could lead to RDD partitions getting > left on executors after they were unpersisted, but this is probably better > than the whole job failing. I think in most cases the IOException would be > due to the executor dieing for some reason, which is effectively the same > result as unpersisting the rdd from that executor anyway. > I noticed this exception in a job that loads a 100GB dataset on a cluster > where we use dynamic allocation heavily. Here is the relevant stack trace > java.io.IOException: Connection reset by peer > at sun.nio.ch.FileDispatcherImpl.read0(Native Method) > at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) > at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) > at sun.nio.ch.IOUtil.read(IOUtil.java:192) > at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380) > at > io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:221) > at > io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:899) > at > io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:276) > at > io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119) > at > io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645) > at > io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580) > at > io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497) > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459) > at > io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131) > at > io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138) > at java.lang.Thread.run(Thread.java:748) > Exception in thread "main" org.apache.spark.SparkException: Exception thrown > in awaitResult: > at > org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205) > at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75) > at > org.apache.spark.storage.BlockManagerMaster.removeRdd(BlockManagerMaster.scala:131) > at org.apache.spark.SparkContext.unpersistRDD(SparkContext.scala:1806) > at org.apache.spark.rdd.RDD.unpersist(RDD.scala:217) > at > com.ibm.sparktc.sparkbench.workload.exercise.CacheTest.doWorkload(CacheTest.scala:62) > at > com.ibm.sparktc.sparkbench.workload.Workload$class.run(Workload.scala:40) > at > com.ibm.sparktc.sparkbench.workload.exercise.CacheTest.run(CacheTest.scala:33) > at > com.ibm.sparktc.sparkbench.workload.SuiteKickoff$$anonfun$com$ibm$sparktc$sparkbench$workload$SuiteKickoff$$runSerially$1.apply(SuiteKickoff.scala:78) > at > com.ibm.sparktc.sparkbench.workload.SuiteKickoff$$anonfun$com$ibm$sparktc$sparkbench$workload$SuiteKickoff$$runSerially$1.apply(SuiteKickoff.scala:78) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.immutable.List.foreach(List.scala:381) > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.immutable.List.map(List.scala:285) > at > com.ibm.sparktc.sparkbench.workload.SuiteKickoff$.com$ibm$sparktc$sparkbench$workload$SuiteKickoff$$runSerially(SuiteKickoff.scala:78) > at >