[ https://issues.apache.org/jira/browse/SPARK-30720?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Andrei Stankevich updated SPARK-30720: -------------------------------------- Description: We are using spark 2.4.3 with mesos and with external shuffle service. External shuffle service is launched using systemd by command {code:java} /bin/bash -ce "exec /*/spark/bin/spark-class org.apache.spark.deploy.mesos.MesosExternalShuffleService" {code} Sometimes spark executor has connection timeout when it tries to connect to external shuffle service. When it happens spark executor throws an exception {noformat} ERROR BlockManager: Failed to connect to external shuffle server, will retry 4 more times after waiting 5 seconds...{noformat} If connection timeout happens 4 more times spark executor throws an error {noformat} ERROR CoarseGrainedExecutorBackend: Executor self-exiting due to : Unable to create executor due to Unable to register with external shuffle server due to : Failed to connect to our-host.com/10.103..:7337 {noformat} After this error Spark application just hangs. On Mesos UI it goes to inactive frameworks and on Spark Driver UI I can see few failed tasks and looks like it does nothing. External Shuffle service throws an exception {code:java} ERROR TransportRequestHandler: Error sending result RpcResponse{requestId=4941243310586976766, body=NioManagedBuffer{buf=java.nio.HeapByteBuffer[pos=0 lim=13 cap=13]}} to /10.103.*.*:49482; closing connection{code} Full spark executor log is ERROR BlockManager: Failed to connect to external shuffle server, will retry 1 more times after waiting 5 seconds... java.io.IOException: Failed to connect to our-host.com/10.103.*.*:7337 at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:245) at org.apache.spark.network.client.TransportClientFactory.createUnmanagedClient(TransportClientFactory.java:201) at org.apache.spark.network.shuffle.ExternalShuffleClient.registerWithShuffleServer(ExternalShuffleClient.java:142) at org.apache.spark.storage.BlockManager.$anonfun$registerWithExternalShuffleServer$3(BlockManager.scala:295) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:158) at org.apache.spark.storage.BlockManager.registerWithExternalShuffleServer(BlockManager.scala:291) at org.apache.spark.storage.BlockManager.initialize(BlockManager.scala:265) at org.apache.spark.executor.Executor.<init>(Executor.scala:118) at org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receive$1.applyOrElse(CoarseGrainedExecutorBackend.scala:83) at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:117) at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:205) at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:102) at org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:221) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection timed out: our-host.com/10.103.*.*:7337 at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:323) at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:340) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:633) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459) at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138) ... 1 more Caused by: java.net.ConnectException: Connection timed out ... 11 more ERROR CoarseGrainedExecutorBackend: Executor self-exiting due to : Unable to create executor due to Unable to register with external shuffle server due to : Failed to connect to our-host.com/10.103.**.**:7337 org.apache.spark.SparkException: Unable to register with external shuffle server due to : Failed to connect to our-host.com/10.103.*.*:7337 at org.apache.spark.storage.BlockManager.$anonfun$registerWithExternalShuffleServer$3(BlockManager.scala:304) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:158) at org.apache.spark.storage.BlockManager.registerWithExternalShuffleServer(BlockManager.scala:291) at org.apache.spark.storage.BlockManager.initialize(BlockManager.scala:265) at org.apache.spark.executor.Executor.<init>(Executor.scala:118) at org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receive$1.applyOrElse(CoarseGrainedExecutorBackend.scala:83) at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:117) at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:205) at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:102) at org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:221) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.IOException: Failed to connect to our-host.com/10.103.**.**:7337 at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:245) at org.apache.spark.network.client.TransportClientFactory.createUnmanagedClient(TransportClientFactory.java:201) at org.apache.spark.network.shuffle.ExternalShuffleClient.registerWithShuffleServer(ExternalShuffleClient.java:142) at org.apache.spark.storage.BlockManager.$anonfun$registerWithExternalShuffleServer$3(BlockManager.scala:295) ... 12 more Caused by: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection timed out: our-host.com/10.103.**.**:7337 at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:323) at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:340) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:633) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459) at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138) ... 1 more Caused by: java.net.ConnectException: Connection timed out ... 11 more INFO DiskBlockManager: Shutdown hook called INFO ShutdownHookManager: Shutdown hook called I0131 16:29:25.446748 3768 executor.cpp:1039] Command exited with status 1 (pid: 3795) I0131 16:29:26.447976 3794 process.cpp:935] Stopped the socket accept loop Full external shuffle service log is ERROR TransportRequestHandler: Error sending result RpcResponse{requestId=4941243310586976766, body=NioManagedBuffer{buf=java.nio.HeapByteBuffer[pos=0 lim=13 cap=13]}} to /10.103.*.*:49482; closing connection java.io.IOException: Broken pipe at sun.nio.ch.FileDispatcherImpl.write0(Native Method) at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) at sun.nio.ch.IOUtil.write(IOUtil.java:65) at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) at org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:148) at org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:111) at io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355) at io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224) at io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382) at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934) at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362) at io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901) at io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321) at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) at io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115) at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117) at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:802) at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:814) at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:794) at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:831) at io.netty.channel.DefaultChannelPipeline.writeAndFlush(DefaultChannelPipeline.java:1041) at io.netty.channel.AbstractChannel.writeAndFlush(AbstractChannel.java:300) at org.apache.spark.network.server.TransportRequestHandler.respond(TransportRequestHandler.java:288) at org.apache.spark.network.server.TransportRequestHandler.access$000(TransportRequestHandler.java:45) at org.apache.spark.network.server.TransportRequestHandler$1.onSuccess(TransportRequestHandler.java:183) at org.apache.spark.network.shuffle.ExternalShuffleBlockHandler.handleMessage(ExternalShuffleBlockHandler.java:102) at org.apache.spark.deploy.mesos.MesosExternalShuffleBlockHandler.handleMessage(MesosExternalShuffleService.scala:78) at org.apache.spark.network.shuffle.ExternalShuffleBlockHandler.receive(ExternalShuffleBlockHandler.java:81) at org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:180) at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:103) at org.apache.spark.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:118) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:85) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1359) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:935) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:138) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459) at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138) at java.lang.Thread.run(Thread.java:748) ERROR TransportRequestHandler: Error sending result RpcResponse{requestId=4713235420893637000, body=NioManagedBuffer{buf=java.nio.HeapByteBuffer[pos=0 lim=13 cap=13]}} to /10.103.*.*:49482; closing connection java.nio.channels.ClosedChannelException was: We are using spark 2.4.3 with mesos and with external shuffle service. External shuffle service is launched using systemd by command {code:java} /bin/bash -ce "exec /*/spark/bin/spark-class org.apache.spark.deploy.mesos.MesosExternalShuffleService" {code} Sometimes spark executor has connection timeout when it tries to connect to external shuffle service. When it happens spark executor throws an exception {noformat} ERROR BlockManager: Failed to connect to external shuffle server, will retry 4 more times after waiting 5 seconds...{noformat} If connection timeout happens 4 more times spark executor throws an error {noformat} ERROR CoarseGrainedExecutorBackend: Executor self-exiting due to : Unable to create executor due to Unable to register with external shuffle server due to : Failed to connect to our-host.com/10.103..:7337 {noformat} After this error Spark application just hangs. On Mesos UI it goes to inactive frameworks and on Spark Driver UI I can see few failed tasks and looks like it does nothing. External Shuffle service throws an exception {code:java} ERROR TransportRequestHandler: Error sending result RpcResponse{requestId=4941243310586976766, body=NioManagedBuffer{buf=java.nio.HeapByteBuffer[pos=0 lim=13 cap=13]}} to /10.103.*.*:49482; closing connection{code} Full spark executor log is 20/01/31 16:27:09 ERROR BlockManager: Failed to connect to external shuffle server, will retry 1 more times after waiting 5 seconds... java.io.IOException: Failed to connect to our-host.com/10.103.*.*:7337 at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:245) at org.apache.spark.network.client.TransportClientFactory.createUnmanagedClient(TransportClientFactory.java:201) at org.apache.spark.network.shuffle.ExternalShuffleClient.registerWithShuffleServer(ExternalShuffleClient.java:142) at org.apache.spark.storage.BlockManager.$anonfun$registerWithExternalShuffleServer$3(BlockManager.scala:295) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:158) at org.apache.spark.storage.BlockManager.registerWithExternalShuffleServer(BlockManager.scala:291) at org.apache.spark.storage.BlockManager.initialize(BlockManager.scala:265) at org.apache.spark.executor.Executor.<init>(Executor.scala:118) at org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receive$1.applyOrElse(CoarseGrainedExecutorBackend.scala:83) at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:117) at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:205) at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:102) at org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:221) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection timed out: our-host.com/10.103.*.*:7337 at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:323) at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:340) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:633) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459) at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138) ... 1 more Caused by: java.net.ConnectException: Connection timed out ... 11 more 20/01/31 16:29:25 ERROR CoarseGrainedExecutorBackend: Executor self-exiting due to : Unable to create executor due to Unable to register with external shuffle server due to : Failed to connect to our-host.com/10.103.**.**:7337 org.apache.spark.SparkException: Unable to register with external shuffle server due to : Failed to connect to our-host.com/10.103.*.*:7337 at org.apache.spark.storage.BlockManager.$anonfun$registerWithExternalShuffleServer$3(BlockManager.scala:304) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:158) at org.apache.spark.storage.BlockManager.registerWithExternalShuffleServer(BlockManager.scala:291) at org.apache.spark.storage.BlockManager.initialize(BlockManager.scala:265) at org.apache.spark.executor.Executor.<init>(Executor.scala:118) at org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receive$1.applyOrElse(CoarseGrainedExecutorBackend.scala:83) at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:117) at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:205) at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:102) at org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:221) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.IOException: Failed to connect to our-host.com/10.103.**.**:7337 at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:245) at org.apache.spark.network.client.TransportClientFactory.createUnmanagedClient(TransportClientFactory.java:201) at org.apache.spark.network.shuffle.ExternalShuffleClient.registerWithShuffleServer(ExternalShuffleClient.java:142) at org.apache.spark.storage.BlockManager.$anonfun$registerWithExternalShuffleServer$3(BlockManager.scala:295) ... 12 more Caused by: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection timed out: our-host.com/10.103.**.**:7337 at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:323) at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:340) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:633) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459) at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138) ... 1 more Caused by: java.net.ConnectException: Connection timed out ... 11 more 20/01/31 16:29:25 INFO DiskBlockManager: Shutdown hook called 20/01/31 16:29:25 INFO ShutdownHookManager: Shutdown hook called I0131 16:29:25.446748 3768 executor.cpp:1039] Command exited with status 1 (pid: 3795) I0131 16:29:26.447976 3794 process.cpp:935] Stopped the socket accept loop Full external shuffle service log is 20/01/31 05:57:37 ERROR TransportRequestHandler: Error sending result RpcResponse{requestId=4941243310586976766, body=NioManagedBuffer{buf=java.nio.HeapByteBuffer[pos=0 lim=13 cap=13]}} to /10.103.*.*:49482; closing connection java.io.IOException: Broken pipe at sun.nio.ch.FileDispatcherImpl.write0(Native Method) at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) at sun.nio.ch.IOUtil.write(IOUtil.java:65) at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) at org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:148) at org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:111) at io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355) at io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224) at io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382) at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934) at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362) at io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901) at io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321) at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) at io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115) at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117) at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:802) at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:814) at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:794) at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:831) at io.netty.channel.DefaultChannelPipeline.writeAndFlush(DefaultChannelPipeline.java:1041) at io.netty.channel.AbstractChannel.writeAndFlush(AbstractChannel.java:300) at org.apache.spark.network.server.TransportRequestHandler.respond(TransportRequestHandler.java:288) at org.apache.spark.network.server.TransportRequestHandler.access$000(TransportRequestHandler.java:45) at org.apache.spark.network.server.TransportRequestHandler$1.onSuccess(TransportRequestHandler.java:183) at org.apache.spark.network.shuffle.ExternalShuffleBlockHandler.handleMessage(ExternalShuffleBlockHandler.java:102) at org.apache.spark.deploy.mesos.MesosExternalShuffleBlockHandler.handleMessage(MesosExternalShuffleService.scala:78) at org.apache.spark.network.shuffle.ExternalShuffleBlockHandler.receive(ExternalShuffleBlockHandler.java:81) at org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:180) at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:103) at org.apache.spark.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:118) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:85) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1359) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:935) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:138) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459) at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138) at java.lang.Thread.run(Thread.java:748) 20/01/31 05:57:37 ERROR TransportRequestHandler: Error sending result RpcResponse{requestId=4713235420893637000, body=NioManagedBuffer{buf=java.nio.HeapByteBuffer[pos=0 lim=13 cap=13]}} to /10.103.*.*:49482; closing connection java.nio.channels.ClosedChannelException > Spark framework hangs and becomes inactive on Mesos UI if executor can not > connect to shuffle external service. > --------------------------------------------------------------------------------------------------------------- > > Key: SPARK-30720 > URL: https://issues.apache.org/jira/browse/SPARK-30720 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 2.4.3 > Reporter: Andrei Stankevich > Priority: Major > > We are using spark 2.4.3 with mesos and with external shuffle service. > External shuffle service is launched using systemd by command > {code:java} > /bin/bash -ce "exec /*/spark/bin/spark-class > org.apache.spark.deploy.mesos.MesosExternalShuffleService" > {code} > Sometimes spark executor has connection timeout when it tries to connect to > external shuffle service. When it happens spark executor throws an exception > {noformat} > ERROR BlockManager: Failed to connect to external shuffle server, will retry > 4 more times after waiting 5 seconds...{noformat} > If connection timeout happens 4 more times spark executor throws an error > {noformat} > ERROR CoarseGrainedExecutorBackend: Executor self-exiting due to : Unable to > create executor due to Unable to register with external shuffle server due to > : Failed to connect to our-host.com/10.103..:7337 > {noformat} > After this error Spark application just hangs. On Mesos UI it goes to > inactive frameworks and on Spark Driver UI I can see few failed tasks and > looks like it does nothing. > > External Shuffle service throws an exception > {code:java} > ERROR TransportRequestHandler: Error sending result > RpcResponse{requestId=4941243310586976766, > body=NioManagedBuffer{buf=java.nio.HeapByteBuffer[pos=0 lim=13 cap=13]}} to > /10.103.*.*:49482; closing connection{code} > > Full spark executor log is > > ERROR BlockManager: Failed to connect to external shuffle server, will retry > 1 more times after waiting 5 seconds... > java.io.IOException: Failed to connect to our-host.com/10.103.*.*:7337 > at > org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:245) > at > org.apache.spark.network.client.TransportClientFactory.createUnmanagedClient(TransportClientFactory.java:201) > at > org.apache.spark.network.shuffle.ExternalShuffleClient.registerWithShuffleServer(ExternalShuffleClient.java:142) > at > org.apache.spark.storage.BlockManager.$anonfun$registerWithExternalShuffleServer$3(BlockManager.scala:295) > at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:158) > at > org.apache.spark.storage.BlockManager.registerWithExternalShuffleServer(BlockManager.scala:291) > at org.apache.spark.storage.BlockManager.initialize(BlockManager.scala:265) > at org.apache.spark.executor.Executor.<init>(Executor.scala:118) > at > org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receive$1.applyOrElse(CoarseGrainedExecutorBackend.scala:83) > at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:117) > at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:205) > at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:102) > at > org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:221) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: io.netty.channel.AbstractChannel$AnnotatedConnectException: > Connection timed out: our-host.com/10.103.*.*:7337 > at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) > at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) > at > io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:323) > at > io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:340) > at > io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:633) > at > io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580) > at > io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497) > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459) > at > io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) > at > io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138) > ... 1 more > Caused by: java.net.ConnectException: Connection timed out > ... 11 more > ERROR CoarseGrainedExecutorBackend: Executor self-exiting due to : Unable to > create executor due to Unable to register with external shuffle server due to > : Failed to connect to our-host.com/10.103.**.**:7337 > org.apache.spark.SparkException: Unable to register with external shuffle > server due to : Failed to connect to our-host.com/10.103.*.*:7337 > at > org.apache.spark.storage.BlockManager.$anonfun$registerWithExternalShuffleServer$3(BlockManager.scala:304) > at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:158) > at > org.apache.spark.storage.BlockManager.registerWithExternalShuffleServer(BlockManager.scala:291) > at org.apache.spark.storage.BlockManager.initialize(BlockManager.scala:265) > at org.apache.spark.executor.Executor.<init>(Executor.scala:118) > at > org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receive$1.applyOrElse(CoarseGrainedExecutorBackend.scala:83) > at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:117) > at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:205) > at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:102) > at > org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:221) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.io.IOException: Failed to connect to > our-host.com/10.103.**.**:7337 > at > org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:245) > at > org.apache.spark.network.client.TransportClientFactory.createUnmanagedClient(TransportClientFactory.java:201) > at > org.apache.spark.network.shuffle.ExternalShuffleClient.registerWithShuffleServer(ExternalShuffleClient.java:142) > at > org.apache.spark.storage.BlockManager.$anonfun$registerWithExternalShuffleServer$3(BlockManager.scala:295) > ... 12 more > Caused by: io.netty.channel.AbstractChannel$AnnotatedConnectException: > Connection timed out: our-host.com/10.103.**.**:7337 > at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) > at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) > at > io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:323) > at > io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:340) > at > io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:633) > at > io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580) > at > io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497) > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459) > at > io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) > at > io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138) > ... 1 more > Caused by: java.net.ConnectException: Connection timed out > ... 11 more > INFO DiskBlockManager: Shutdown hook called > INFO ShutdownHookManager: Shutdown hook called > I0131 16:29:25.446748 3768 executor.cpp:1039] Command exited with status 1 > (pid: 3795) > I0131 16:29:26.447976 3794 process.cpp:935] Stopped the socket accept loop > > Full external shuffle service log is > ERROR TransportRequestHandler: Error sending result > RpcResponse{requestId=4941243310586976766, > body=NioManagedBuffer{buf=java.nio.HeapByteBuffer[pos=0 lim=13 cap=13]}} to > /10.103.*.*:49482; closing connection > java.io.IOException: Broken pipe > at sun.nio.ch.FileDispatcherImpl.write0(Native Method) > at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) > at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) > at sun.nio.ch.IOUtil.write(IOUtil.java:65) > at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) > at > org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:148) > at > org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:111) > at > io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355) > at > io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224) > at > io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382) > at > io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934) > at > io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362) > at > io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901) > at > io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at > io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:802) > at > io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:814) > at > io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:794) > at > io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:831) > at > io.netty.channel.DefaultChannelPipeline.writeAndFlush(DefaultChannelPipeline.java:1041) > at io.netty.channel.AbstractChannel.writeAndFlush(AbstractChannel.java:300) > at > org.apache.spark.network.server.TransportRequestHandler.respond(TransportRequestHandler.java:288) > at > org.apache.spark.network.server.TransportRequestHandler.access$000(TransportRequestHandler.java:45) > at > org.apache.spark.network.server.TransportRequestHandler$1.onSuccess(TransportRequestHandler.java:183) > at > org.apache.spark.network.shuffle.ExternalShuffleBlockHandler.handleMessage(ExternalShuffleBlockHandler.java:102) > at > org.apache.spark.deploy.mesos.MesosExternalShuffleBlockHandler.handleMessage(MesosExternalShuffleService.scala:78) > at > org.apache.spark.network.shuffle.ExternalShuffleBlockHandler.receive(ExternalShuffleBlockHandler.java:81) > at > org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:180) > at > org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:103) > at > org.apache.spark.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:118) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) > at > io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) > at > io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) > at > org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:85) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) > at > io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1359) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) > at > io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:935) > at > io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:138) > at > io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645) > at > io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580) > at > io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497) > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459) > at > io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) > at > io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138) > at java.lang.Thread.run(Thread.java:748) > ERROR TransportRequestHandler: Error sending result > RpcResponse{requestId=4713235420893637000, > body=NioManagedBuffer{buf=java.nio.HeapByteBuffer[pos=0 lim=13 cap=13]}} to > /10.103.*.*:49482; closing connection > java.nio.channels.ClosedChannelException > -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org