[jira] [Commented] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue
[ https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17753038#comment-17753038 ] nebi mert aydin commented on SPARK-24578: - I still have this problem in Amazon EMR spark 3.1.2, any idea? > Reading remote cache block behavior changes and causes timeout issue > > > Key: SPARK-24578 > URL: https://issues.apache.org/jira/browse/SPARK-24578 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.0, 2.3.1 >Reporter: Wenbo Zhao >Assignee: Wenbo Zhao >Priority: Blocker > Fix For: 2.3.2, 2.4.0 > > > After Spark 2.3, we observed lots of errors like the following in some of our > production job > {code:java} > 18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result > ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003, > chunkIndex=0}, > buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to > /172.22.18.7:60865; closing connection > java.io.IOException: Broken pipe > at sun.nio.ch.FileDispatcherImpl.write0(Native Method) > at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) > at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) > at sun.nio.ch.IOUtil.write(IOUtil.java:65) > at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) > at > org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156) > at > org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142) > at > org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123) > at > io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355) > at > io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224) > at > io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382) > at > io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934) > at > io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362) > at > io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901) > at > io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at > io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at > io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983) > at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248) > at > io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284) > at > io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) > at > io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403) > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463) > at > io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) > at > io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138) > {code} > > Here is a small reproducible for a small cluster of 2 executors (say host-1 > and host-2) each with 8 cores. Here, the memory of driver and executors are > not an import factor here as long as it is big enough, say 20G. > {code:java} > val n = 1 > val df0 = sc.parallelize(1 to n).toDF > val df = df0.withColumn("x0", rand()).withColumn("x0", rand() > ).withColumn("x1", rand() > ).withColumn("x2", rand() > ).withColumn("x3", rand() > ).withColumn("x4", rand() > ).withColumn("x5", rand() > ).withColumn("x6", rand() > ).withColumn("x7", rand() > ).withColumn("x8", rand() > ).withColu
[jira] [Commented] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue
[ https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16518300#comment-16518300 ] Imran Rashid commented on SPARK-24578: -- Given the severity of the issue and that its a regression in 2.3, I've set the target version and made it a blocker > Reading remote cache block behavior changes and causes timeout issue > > > Key: SPARK-24578 > URL: https://issues.apache.org/jira/browse/SPARK-24578 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.0, 2.3.1 >Reporter: Wenbo Zhao >Priority: Blocker > > After Spark 2.3, we observed lots of errors like the following in some of our > production job > {code:java} > 18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result > ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003, > chunkIndex=0}, > buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to > /172.22.18.7:60865; closing connection > java.io.IOException: Broken pipe > at sun.nio.ch.FileDispatcherImpl.write0(Native Method) > at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) > at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) > at sun.nio.ch.IOUtil.write(IOUtil.java:65) > at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) > at > org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156) > at > org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142) > at > org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123) > at > io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355) > at > io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224) > at > io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382) > at > io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934) > at > io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362) > at > io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901) > at > io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at > io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at > io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983) > at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248) > at > io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284) > at > io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) > at > io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403) > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463) > at > io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) > at > io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138) > {code} > > Here is a small reproducible for a small cluster of 2 executors (say host-1 > and host-2) each with 8 cores. Here, the memory of driver and executors are > not an import factor here as long as it is big enough, say 20G. > {code:java} > val n = 1 > val df0 = sc.parallelize(1 to n).toDF > val df = df0.withColumn("x0", rand()).withColumn("x0", rand() > ).withColumn("x1", rand() > ).withColumn("x2", rand() > ).withColumn("x3", rand() > ).withColumn("x4", rand() > ).withColumn("x5", rand() > ).withColumn("x6", rand() > ).withColumn("x7", rand() > ).withColumn("x8", rand() > ).withColumn("x9", rand()) > df.ca
[jira] [Commented] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue
[ https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16517532#comment-16517532 ] Wenbo Zhao commented on SPARK-24578: woop, [~attilapiros], sorry, I didn't you have created a PR. > Reading remote cache block behavior changes and causes timeout issue > > > Key: SPARK-24578 > URL: https://issues.apache.org/jira/browse/SPARK-24578 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.0, 2.3.1 >Reporter: Wenbo Zhao >Priority: Major > > After Spark 2.3, we observed lots of errors like the following in some of our > production job > {code:java} > 18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result > ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003, > chunkIndex=0}, > buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to > /172.22.18.7:60865; closing connection > java.io.IOException: Broken pipe > at sun.nio.ch.FileDispatcherImpl.write0(Native Method) > at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) > at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) > at sun.nio.ch.IOUtil.write(IOUtil.java:65) > at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) > at > org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156) > at > org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142) > at > org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123) > at > io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355) > at > io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224) > at > io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382) > at > io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934) > at > io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362) > at > io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901) > at > io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at > io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at > io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983) > at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248) > at > io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284) > at > io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) > at > io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403) > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463) > at > io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) > at > io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138) > {code} > > Here is a small reproducible for a small cluster of 2 executors (say host-1 > and host-2) each with 8 cores. Here, the memory of driver and executors are > not an import factor here as long as it is big enough, say 20G. > {code:java} > val n = 1 > val df0 = sc.parallelize(1 to n).toDF > val df = df0.withColumn("x0", rand()).withColumn("x0", rand() > ).withColumn("x1", rand() > ).withColumn("x2", rand() > ).withColumn("x3", rand() > ).withColumn("x4", rand() > ).withColumn("x5", rand() > ).withColumn("x6", rand() > ).withColumn("x7", rand() > ).withColumn("x8", rand() > ).withColumn("x9", rand()) > df.cache; df.count > (1 to 10).toArray.par.map { i => println(i);
[jira] [Commented] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue
[ https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16517529#comment-16517529 ] Apache Spark commented on SPARK-24578: -- User 'WenboZhao' has created a pull request for this issue: https://github.com/apache/spark/pull/21593 > Reading remote cache block behavior changes and causes timeout issue > > > Key: SPARK-24578 > URL: https://issues.apache.org/jira/browse/SPARK-24578 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.0, 2.3.1 >Reporter: Wenbo Zhao >Priority: Major > > After Spark 2.3, we observed lots of errors like the following in some of our > production job > {code:java} > 18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result > ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003, > chunkIndex=0}, > buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to > /172.22.18.7:60865; closing connection > java.io.IOException: Broken pipe > at sun.nio.ch.FileDispatcherImpl.write0(Native Method) > at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) > at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) > at sun.nio.ch.IOUtil.write(IOUtil.java:65) > at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) > at > org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156) > at > org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142) > at > org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123) > at > io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355) > at > io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224) > at > io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382) > at > io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934) > at > io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362) > at > io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901) > at > io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at > io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at > io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983) > at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248) > at > io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284) > at > io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) > at > io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403) > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463) > at > io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) > at > io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138) > {code} > > Here is a small reproducible for a small cluster of 2 executors (say host-1 > and host-2) each with 8 cores. Here, the memory of driver and executors are > not an import factor here as long as it is big enough, say 20G. > {code:java} > val n = 1 > val df0 = sc.parallelize(1 to n).toDF > val df = df0.withColumn("x0", rand()).withColumn("x0", rand() > ).withColumn("x1", rand() > ).withColumn("x2", rand() > ).withColumn("x3", rand() > ).withColumn("x4", rand() > ).withColumn("x5", rand() > ).withColumn("x6", rand() > ).withColumn("x7", rand() > ).withColumn("x8", rand() > ).withColumn("x9", rand()) > df.cache; df.count >
[jira] [Commented] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue
[ https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16517515#comment-16517515 ] Attila Zsolt Piros commented on SPARK-24578: [~wbzhao] oh sorry I read your comment late, definitely without your help (pointing to the right commit caused the problem) it would took much much longer. If you like please create another PR, it is fine for me. > Reading remote cache block behavior changes and causes timeout issue > > > Key: SPARK-24578 > URL: https://issues.apache.org/jira/browse/SPARK-24578 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.0, 2.3.1 >Reporter: Wenbo Zhao >Priority: Major > > After Spark 2.3, we observed lots of errors like the following in some of our > production job > {code:java} > 18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result > ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003, > chunkIndex=0}, > buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to > /172.22.18.7:60865; closing connection > java.io.IOException: Broken pipe > at sun.nio.ch.FileDispatcherImpl.write0(Native Method) > at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) > at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) > at sun.nio.ch.IOUtil.write(IOUtil.java:65) > at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) > at > org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156) > at > org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142) > at > org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123) > at > io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355) > at > io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224) > at > io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382) > at > io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934) > at > io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362) > at > io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901) > at > io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at > io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at > io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983) > at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248) > at > io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284) > at > io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) > at > io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403) > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463) > at > io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) > at > io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138) > {code} > > Here is a small reproducible for a small cluster of 2 executors (say host-1 > and host-2) each with 8 cores. Here, the memory of driver and executors are > not an import factor here as long as it is big enough, say 20G. > {code:java} > val n = 1 > val df0 = sc.parallelize(1 to n).toDF > val df = df0.withColumn("x0", rand()).withColumn("x0", rand() > ).withColumn("x1", rand() > ).withColumn("x2", rand() > ).withColumn("x3", rand() > ).withColumn("x4", rand() > ).withColumn("x5", rand() > ).withColu
[jira] [Commented] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue
[ https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16517505#comment-16517505 ] Apache Spark commented on SPARK-24578: -- User 'attilapiros' has created a pull request for this issue: https://github.com/apache/spark/pull/21592 > Reading remote cache block behavior changes and causes timeout issue > > > Key: SPARK-24578 > URL: https://issues.apache.org/jira/browse/SPARK-24578 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.0, 2.3.1 >Reporter: Wenbo Zhao >Priority: Major > > After Spark 2.3, we observed lots of errors like the following in some of our > production job > {code:java} > 18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result > ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003, > chunkIndex=0}, > buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to > /172.22.18.7:60865; closing connection > java.io.IOException: Broken pipe > at sun.nio.ch.FileDispatcherImpl.write0(Native Method) > at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) > at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) > at sun.nio.ch.IOUtil.write(IOUtil.java:65) > at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) > at > org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156) > at > org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142) > at > org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123) > at > io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355) > at > io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224) > at > io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382) > at > io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934) > at > io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362) > at > io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901) > at > io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at > io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at > io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983) > at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248) > at > io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284) > at > io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) > at > io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403) > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463) > at > io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) > at > io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138) > {code} > > Here is a small reproducible for a small cluster of 2 executors (say host-1 > and host-2) each with 8 cores. Here, the memory of driver and executors are > not an import factor here as long as it is big enough, say 20G. > {code:java} > val n = 1 > val df0 = sc.parallelize(1 to n).toDF > val df = df0.withColumn("x0", rand()).withColumn("x0", rand() > ).withColumn("x1", rand() > ).withColumn("x2", rand() > ).withColumn("x3", rand() > ).withColumn("x4", rand() > ).withColumn("x5", rand() > ).withColumn("x6", rand() > ).withColumn("x7", rand() > ).withColumn("x8", rand() > ).withColumn("x9", rand()) > df.cache; df.count
[jira] [Commented] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue
[ https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16517465#comment-16517465 ] Wenbo Zhao commented on SPARK-24578: [~attilapiros] if don't mind, I could create a PR for it :) > Reading remote cache block behavior changes and causes timeout issue > > > Key: SPARK-24578 > URL: https://issues.apache.org/jira/browse/SPARK-24578 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.0, 2.3.1 >Reporter: Wenbo Zhao >Priority: Major > > After Spark 2.3, we observed lots of errors like the following in some of our > production job > {code:java} > 18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result > ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003, > chunkIndex=0}, > buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to > /172.22.18.7:60865; closing connection > java.io.IOException: Broken pipe > at sun.nio.ch.FileDispatcherImpl.write0(Native Method) > at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) > at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) > at sun.nio.ch.IOUtil.write(IOUtil.java:65) > at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) > at > org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156) > at > org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142) > at > org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123) > at > io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355) > at > io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224) > at > io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382) > at > io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934) > at > io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362) > at > io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901) > at > io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at > io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at > io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983) > at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248) > at > io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284) > at > io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) > at > io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403) > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463) > at > io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) > at > io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138) > {code} > > Here is a small reproducible for a small cluster of 2 executors (say host-1 > and host-2) each with 8 cores. Here, the memory of driver and executors are > not an import factor here as long as it is big enough, say 20G. > {code:java} > val n = 1 > val df0 = sc.parallelize(1 to n).toDF > val df = df0.withColumn("x0", rand()).withColumn("x0", rand() > ).withColumn("x1", rand() > ).withColumn("x2", rand() > ).withColumn("x3", rand() > ).withColumn("x4", rand() > ).withColumn("x5", rand() > ).withColumn("x6", rand() > ).withColumn("x7", rand() > ).withColumn("x8", rand() > ).withColumn("x9", rand()) > df.cache; df.count > (1 to 10).toArray.par.map { i => println(i); >
[jira] [Commented] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue
[ https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16517461#comment-16517461 ] Marcelo Vanzin commented on SPARK-24578: Ah, I see. That makes sense. (I actually took at look at netty changes and didn't find anything in this area, so was starting to wonder.) > Reading remote cache block behavior changes and causes timeout issue > > > Key: SPARK-24578 > URL: https://issues.apache.org/jira/browse/SPARK-24578 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.0, 2.3.1 >Reporter: Wenbo Zhao >Priority: Major > > After Spark 2.3, we observed lots of errors like the following in some of our > production job > {code:java} > 18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result > ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003, > chunkIndex=0}, > buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to > /172.22.18.7:60865; closing connection > java.io.IOException: Broken pipe > at sun.nio.ch.FileDispatcherImpl.write0(Native Method) > at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) > at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) > at sun.nio.ch.IOUtil.write(IOUtil.java:65) > at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) > at > org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156) > at > org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142) > at > org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123) > at > io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355) > at > io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224) > at > io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382) > at > io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934) > at > io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362) > at > io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901) > at > io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at > io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at > io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983) > at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248) > at > io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284) > at > io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) > at > io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403) > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463) > at > io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) > at > io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138) > {code} > > Here is a small reproducible for a small cluster of 2 executors (say host-1 > and host-2) each with 8 cores. Here, the memory of driver and executors are > not an import factor here as long as it is big enough, say 20G. > {code:java} > val n = 1 > val df0 = sc.parallelize(1 to n).toDF > val df = df0.withColumn("x0", rand()).withColumn("x0", rand() > ).withColumn("x1", rand() > ).withColumn("x2", rand() > ).withColumn("x3", rand() > ).withColumn("x4", rand() > ).withColumn("x5", rand() > ).withColumn("x6", rand() > ).withColumn("x7", rand() > ).withColumn("x8", rand() > ).withColu
[jira] [Commented] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue
[ https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16517458#comment-16517458 ] Wenbo Zhao commented on SPARK-24578: Hi [~vanzin]. the commit [https://github.com/apache/spark/commit/16612638f0539f197eb7deb1be2ec53fed60d707#diff-a1f697fbd4610dff4eb8ff848e8cea2a] is only included since 2.3.0. If I understand right, the root cause is that we don't do `consolidateIfNeeded` anymore for many small chunks which causes the buf.notBuffer() has bad performance in the case that we have to call `copyByteBuf()` many times. In my example, the many times = 500MB / 256 KB. [~attilapiros] I applied {code:java} ByteBuffer buffer = buf.nioBuffer(buf.readerIndex(), Math.min(buf.readableBytes(), NIO_BUFFER_LIMIT));{code} in our spark distribution and ran through the test code in this jira. This works fine. > Reading remote cache block behavior changes and causes timeout issue > > > Key: SPARK-24578 > URL: https://issues.apache.org/jira/browse/SPARK-24578 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.0, 2.3.1 >Reporter: Wenbo Zhao >Priority: Major > > After Spark 2.3, we observed lots of errors like the following in some of our > production job > {code:java} > 18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result > ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003, > chunkIndex=0}, > buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to > /172.22.18.7:60865; closing connection > java.io.IOException: Broken pipe > at sun.nio.ch.FileDispatcherImpl.write0(Native Method) > at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) > at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) > at sun.nio.ch.IOUtil.write(IOUtil.java:65) > at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) > at > org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156) > at > org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142) > at > org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123) > at > io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355) > at > io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224) > at > io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382) > at > io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934) > at > io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362) > at > io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901) > at > io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at > io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at > io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983) > at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248) > at > io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284) > at > io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) > at > io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403) > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463) > at > io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) > at > io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138) > {code} > > Here is a small reproducible for a small cluster
[jira] [Commented] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue
[ https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16517452#comment-16517452 ] Attila Zsolt Piros commented on SPARK-24578: [~wbzhao] Yes, you are right, readerIndex is the first (I just checked skipBytes), thanks. [~irashid] Yes, I can create a PR soon. [~vanzin] Netty was upgraded between 2.2.1 and 2.3.0: from 4.0.43.Final to 4.1.17.Final. Could it be? > Reading remote cache block behavior changes and causes timeout issue > > > Key: SPARK-24578 > URL: https://issues.apache.org/jira/browse/SPARK-24578 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.0, 2.3.1 >Reporter: Wenbo Zhao >Priority: Major > > After Spark 2.3, we observed lots of errors like the following in some of our > production job > {code:java} > 18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result > ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003, > chunkIndex=0}, > buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to > /172.22.18.7:60865; closing connection > java.io.IOException: Broken pipe > at sun.nio.ch.FileDispatcherImpl.write0(Native Method) > at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) > at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) > at sun.nio.ch.IOUtil.write(IOUtil.java:65) > at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) > at > org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156) > at > org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142) > at > org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123) > at > io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355) > at > io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224) > at > io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382) > at > io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934) > at > io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362) > at > io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901) > at > io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at > io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at > io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983) > at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248) > at > io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284) > at > io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) > at > io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403) > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463) > at > io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) > at > io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138) > {code} > > Here is a small reproducible for a small cluster of 2 executors (say host-1 > and host-2) each with 8 cores. Here, the memory of driver and executors are > not an import factor here as long as it is big enough, say 20G. > {code:java} > val n = 1 > val df0 = sc.parallelize(1 to n).toDF > val df = df0.withColumn("x0", rand()).withColumn("x0", rand() > ).withColumn("x1", rand() > ).withColumn("x2", rand() > ).withColumn("x3", rand() > ).withColumn("x4", rand() > ).withColumn("x5
[jira] [Commented] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue
[ https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16517449#comment-16517449 ] Marcelo Vanzin commented on SPARK-24578: Attila's suggestion looks good, but I wonder what caused this to show up in 2.3... the code he's changing has been there since 2.0, so my guess would be something changed in Netty (which was upgraded in 2.3). Anyway, just curious about the underlying cause of the change. > Reading remote cache block behavior changes and causes timeout issue > > > Key: SPARK-24578 > URL: https://issues.apache.org/jira/browse/SPARK-24578 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.0, 2.3.1 >Reporter: Wenbo Zhao >Priority: Major > > After Spark 2.3, we observed lots of errors like the following in some of our > production job > {code:java} > 18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result > ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003, > chunkIndex=0}, > buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to > /172.22.18.7:60865; closing connection > java.io.IOException: Broken pipe > at sun.nio.ch.FileDispatcherImpl.write0(Native Method) > at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) > at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) > at sun.nio.ch.IOUtil.write(IOUtil.java:65) > at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) > at > org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156) > at > org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142) > at > org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123) > at > io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355) > at > io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224) > at > io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382) > at > io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934) > at > io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362) > at > io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901) > at > io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at > io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at > io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983) > at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248) > at > io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284) > at > io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) > at > io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403) > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463) > at > io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) > at > io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138) > {code} > > Here is a small reproducible for a small cluster of 2 executors (say host-1 > and host-2) each with 8 cores. Here, the memory of driver and executors are > not an import factor here as long as it is big enough, say 20G. > {code:java} > val n = 1 > val df0 = sc.parallelize(1 to n).toDF > val df = df0.withColumn("x0", rand()).withColumn("x0", rand() > ).withColumn("x1", rand() > ).withColumn("x2", rand() > ).withColumn("x3", rand() > ).withColumn("
[jira] [Commented] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue
[ https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16517441#comment-16517441 ] Wenbo Zhao commented on SPARK-24578: [~irashid] Yes, that is exactly what I saw in our side. > Reading remote cache block behavior changes and causes timeout issue > > > Key: SPARK-24578 > URL: https://issues.apache.org/jira/browse/SPARK-24578 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.0, 2.3.1 >Reporter: Wenbo Zhao >Priority: Major > > After Spark 2.3, we observed lots of errors like the following in some of our > production job > {code:java} > 18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result > ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003, > chunkIndex=0}, > buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to > /172.22.18.7:60865; closing connection > java.io.IOException: Broken pipe > at sun.nio.ch.FileDispatcherImpl.write0(Native Method) > at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) > at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) > at sun.nio.ch.IOUtil.write(IOUtil.java:65) > at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) > at > org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156) > at > org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142) > at > org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123) > at > io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355) > at > io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224) > at > io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382) > at > io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934) > at > io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362) > at > io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901) > at > io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at > io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at > io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983) > at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248) > at > io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284) > at > io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) > at > io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403) > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463) > at > io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) > at > io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138) > {code} > > Here is a small reproducible for a small cluster of 2 executors (say host-1 > and host-2) each with 8 cores. Here, the memory of driver and executors are > not an import factor here as long as it is big enough, say 20G. > {code:java} > val n = 1 > val df0 = sc.parallelize(1 to n).toDF > val df = df0.withColumn("x0", rand()).withColumn("x0", rand() > ).withColumn("x1", rand() > ).withColumn("x2", rand() > ).withColumn("x3", rand() > ).withColumn("x4", rand() > ).withColumn("x5", rand() > ).withColumn("x6", rand() > ).withColumn("x7", rand() > ).withColumn("x8", rand() > ).withColumn("x9", rand()) > df.cache; df.count > (1 to 10).toArray.par.map { i => println(i); > df.
[jira] [Commented] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue
[ https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16517440#comment-16517440 ] Wenbo Zhao commented on SPARK-24578: Hi [~attilapiros], I guess what you suggest is {code:java} ByteBuffer buffer = buf.nioBuffer(buf.readerIndex(), Math.min(buf.readableBytes(), NIO_BUFFER_LIMIT));{code} Will report back how my test goes. > Reading remote cache block behavior changes and causes timeout issue > > > Key: SPARK-24578 > URL: https://issues.apache.org/jira/browse/SPARK-24578 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.0, 2.3.1 >Reporter: Wenbo Zhao >Priority: Major > > After Spark 2.3, we observed lots of errors like the following in some of our > production job > {code:java} > 18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result > ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003, > chunkIndex=0}, > buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to > /172.22.18.7:60865; closing connection > java.io.IOException: Broken pipe > at sun.nio.ch.FileDispatcherImpl.write0(Native Method) > at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) > at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) > at sun.nio.ch.IOUtil.write(IOUtil.java:65) > at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) > at > org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156) > at > org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142) > at > org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123) > at > io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355) > at > io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224) > at > io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382) > at > io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934) > at > io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362) > at > io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901) > at > io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at > io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at > io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983) > at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248) > at > io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284) > at > io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) > at > io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403) > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463) > at > io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) > at > io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138) > {code} > > Here is a small reproducible for a small cluster of 2 executors (say host-1 > and host-2) each with 8 cores. Here, the memory of driver and executors are > not an import factor here as long as it is big enough, say 20G. > {code:java} > val n = 1 > val df0 = sc.parallelize(1 to n).toDF > val df = df0.withColumn("x0", rand()).withColumn("x0", rand() > ).withColumn("x1", rand() > ).withColumn("x2", rand() > ).withColumn("x3", rand() > ).withColumn("x4", rand() > ).withColumn("x5", rand() > ).withColumn("x6", rand() > ).withCo
[jira] [Commented] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue
[ https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16517439#comment-16517439 ] Imran Rashid commented on SPARK-24578: -- I think [~attilapiros] may be right -- can you send a PR to fix? We were able to reproduce the issue with the code you gave, and noticed that the errors occur exactly at the two minute timeout. The receiver closes the connection because of the timeout, so then the sender just has a generic failure that the connection was closed. Sender: {noformat} 18/06/18 10:12:08 ERROR server.TransportRequestHandler: Error sending result ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=879251518000, chunkIndex=0}, buffer=org.apache.spark.storage.BlockManagerManagedBuffer@19b0a640} to /xx:38710; closing connection java.io.IOException: Connection reset by peer {noformat} Receiver: {noformat} 18/06/18 10:12:08 ERROR server.TransportChannelHandler: Connection to xxx.com/:38765 has been quiet for 12 ms while there are outstanding requests. Assuming connection is dead; please adjust spark.network.timeout if this is wrong. 18/06/18 10:12:08 ERROR client.TransportResponseHandler: Still have 1 requests outstanding when connection from x.com/:38765 is closed 18/06/18 10:12:08 INFO shuffle.RetryingBlockFetcher: Retrying fetch (1/3) for 1 outstanding blocks after 5000 ms {noformat} At first I found it hard to believe this slow down would lead to the connection appearing totally quiet -- but i realized that there is actually a lot of concurrent activity going on, as all the executors are also sending and receiving many more blocks from each other. So if you're simultaneously dealing with a bunch of large blocks, and we put this big slow down inside the netty threads, this could lead to things appearing idle. [~wbzhao] does this match the cases you see as well? > Reading remote cache block behavior changes and causes timeout issue > > > Key: SPARK-24578 > URL: https://issues.apache.org/jira/browse/SPARK-24578 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.0, 2.3.1 >Reporter: Wenbo Zhao >Priority: Major > > After Spark 2.3, we observed lots of errors like the following in some of our > production job > {code:java} > 18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result > ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003, > chunkIndex=0}, > buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to > /172.22.18.7:60865; closing connection > java.io.IOException: Broken pipe > at sun.nio.ch.FileDispatcherImpl.write0(Native Method) > at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) > at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) > at sun.nio.ch.IOUtil.write(IOUtil.java:65) > at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) > at > org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156) > at > org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142) > at > org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123) > at > io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355) > at > io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224) > at > io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382) > at > io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934) > at > io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362) > at > io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901) > at > io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at > io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerCo
[jira] [Commented] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue
[ https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16517416#comment-16517416 ] Attila Zsolt Piros commented on SPARK-24578: I have written a small test I know it is a bit naive but I still thing it shows us something: [https://gist.github.com/attilapiros/730d67b62317d14f5fd0f6779adea245] And the result is: {noformat} n = 500 duration221 = 2 duration221.new = 0 duration230 = 5242 duration230.new = 7 {noformat} My guess the receiver timeouts and the sender is writing into a closed socket. > Reading remote cache block behavior changes and causes timeout issue > > > Key: SPARK-24578 > URL: https://issues.apache.org/jira/browse/SPARK-24578 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.0, 2.3.1 >Reporter: Wenbo Zhao >Priority: Major > > After Spark 2.3, we observed lots of errors like the following in some of our > production job > {code:java} > 18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result > ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003, > chunkIndex=0}, > buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to > /172.22.18.7:60865; closing connection > java.io.IOException: Broken pipe > at sun.nio.ch.FileDispatcherImpl.write0(Native Method) > at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) > at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) > at sun.nio.ch.IOUtil.write(IOUtil.java:65) > at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) > at > org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156) > at > org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142) > at > org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123) > at > io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355) > at > io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224) > at > io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382) > at > io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934) > at > io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362) > at > io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901) > at > io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at > io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at > io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983) > at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248) > at > io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284) > at > io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) > at > io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403) > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463) > at > io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) > at > io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138) > {code} > > Here is a small reproducible for a small cluster of 2 executors (say host-1 > and host-2) each with 8 cores. Here, the memory of driver and executors are > not an import factor here as long as it is big enough, say 20G. > {code:java} > val n = 1 > val df0 = sc.parallelize(1 to n).toDF > val df = df0.withColumn("x0", rand()).withColumn("x0",
[jira] [Commented] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue
[ https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16517287#comment-16517287 ] Attila Zsolt Piros commented on SPARK-24578: The copyByteBuf() along with transferTo() is called several times on a huge byteBuf. If we have a really huge chunk from small chunks we are re-merging the small chunks again and again by [https://github.com/apache/spark/blob/v2.3.0/common/network-common/src/main/java/org/apache/spark/network/protocol/MessageWithHeader.java#L140|https://github.com/apache/spark/blob/v2.3.0/common/network-common/src/main/java/org/apache/spark/network/protocol/MessageWithHeader.java#L140.] although we need only a very small part of it. Is it possible the following line would help? {code:java} ByteBuffer buffer = buf.nioBuffer(0, Math.min(buf.readableBytes(), NIO_BUFFER_LIMIT));{code} > Reading remote cache block behavior changes and causes timeout issue > > > Key: SPARK-24578 > URL: https://issues.apache.org/jira/browse/SPARK-24578 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.0, 2.3.1 >Reporter: Wenbo Zhao >Priority: Major > > After Spark 2.3, we observed lots of errors like the following in some of our > production job > {code:java} > 18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result > ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003, > chunkIndex=0}, > buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to > /172.22.18.7:60865; closing connection > java.io.IOException: Broken pipe > at sun.nio.ch.FileDispatcherImpl.write0(Native Method) > at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) > at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) > at sun.nio.ch.IOUtil.write(IOUtil.java:65) > at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) > at > org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156) > at > org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142) > at > org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123) > at > io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355) > at > io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224) > at > io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382) > at > io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934) > at > io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362) > at > io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901) > at > io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at > io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at > io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983) > at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248) > at > io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284) > at > io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) > at > io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403) > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463) > at > io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) > at > io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138) > {code} > > Here is a small reproducible for a small cluster
[jira] [Commented] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue
[ https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16517180#comment-16517180 ] Wenbo Zhao commented on SPARK-24578: After digging more details, this commit [https://github.com/apache/spark/commit/16612638f0539f197eb7deb1be2ec53fed60d707#diff-a1f697fbd4610dff4eb8ff848e8cea2a] came to my attention. I reverted this commit in our Spark distribution and found out the issue went away. I also examined the log to verify that performance of [https://github.com/apache/spark/blob/v2.3.0/common/network-common/src/main/java/org/apache/spark/network/protocol/MessageWithHeader.java#L142] is back to normal. It is able to transfer the 500MB data in 2s~3s. However, it is still not quite clear why that commit is causing the issue. > Reading remote cache block behavior changes and causes timeout issue > > > Key: SPARK-24578 > URL: https://issues.apache.org/jira/browse/SPARK-24578 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.0, 2.3.1 >Reporter: Wenbo Zhao >Priority: Major > > After Spark 2.3, we observed lots of errors like the following in some of our > production job > {code:java} > 18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result > ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003, > chunkIndex=0}, > buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to > /172.22.18.7:60865; closing connection > java.io.IOException: Broken pipe > at sun.nio.ch.FileDispatcherImpl.write0(Native Method) > at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) > at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) > at sun.nio.ch.IOUtil.write(IOUtil.java:65) > at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) > at > org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156) > at > org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142) > at > org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123) > at > io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355) > at > io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224) > at > io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382) > at > io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934) > at > io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362) > at > io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901) > at > io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at > io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at > io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983) > at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248) > at > io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284) > at > io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) > at > io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403) > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463) > at > io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) > at > io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138) > {code} > > Here is a small reproducible for a small cluster of 2 executors (say host-1 > and host-2) each with 8 cores. Here, the memory of
[jira] [Commented] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue
[ https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16516621#comment-16516621 ] Wenbo Zhao commented on SPARK-24578: Hi [~irashid], many thanks for clarifying my questions. I tried to compare the logs between Spark 2.2.1 and 2.3.0 to see if there something interesting but out of luck so far. I added a few logging to benchmark how much time we need to transfer ~500MB data in the [https://github.com/apache/spark/blob/v2.3.0/common/network-common/src/main/java/org/apache/spark/network/protocol/MessageWithHeader.java#L142]. In Spark 2.2.1, it only took <= 2s, but in Spark 2.3.0, it is very slow and never got finish with 120s and thus timeout. Are you able to reproduce the issue? > Reading remote cache block behavior changes and causes timeout issue > > > Key: SPARK-24578 > URL: https://issues.apache.org/jira/browse/SPARK-24578 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.0, 2.3.1 >Reporter: Wenbo Zhao >Priority: Major > > After Spark 2.3, we observed lots of errors like the following in some of our > production job > {code:java} > 18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result > ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003, > chunkIndex=0}, > buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to > /172.22.18.7:60865; closing connection > java.io.IOException: Broken pipe > at sun.nio.ch.FileDispatcherImpl.write0(Native Method) > at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) > at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) > at sun.nio.ch.IOUtil.write(IOUtil.java:65) > at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) > at > org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156) > at > org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142) > at > org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123) > at > io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355) > at > io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224) > at > io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382) > at > io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934) > at > io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362) > at > io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901) > at > io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at > io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at > io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983) > at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248) > at > io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284) > at > io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) > at > io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403) > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463) > at > io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) > at > io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138) > {code} > > Here is a small reproducible for a small cluster of 2 executors (say host-1 > and host-2) each with 8 cores. Here, the memory of driver and executors are > not an impo
[jira] [Commented] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue
[ https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16516611#comment-16516611 ] Imran Rashid commented on SPARK-24578: -- btw to answer your initial questions: {quote} 1. what is the right behavior, should we re-compute or should we transfer block from remote? 2. if we should transfer from remote, why the performance is so bad for cache block? {quote} Spark should try to fetch from the remote, and if that fails for some reason, it should fall back to recomputing. so you should actually see a similar sequence of events in the logs in spark 2.3.0 as you do in your spark 2.2.1 snippet -- the difference being the remote fetch fails in 2.3.0, and so instead it does the recomputation. The real issue here is why the remote fetches are failing. Unfortunately there isn't a ton of info in that stack trace -- are there any other warning messages before that in the logs? I can see how setting spark.locality.wait lets you workaround this, but its definitely not an ideal solution. I wouldn't rule out that commit you mentioned as the issue -- its certainly possible. > Reading remote cache block behavior changes and causes timeout issue > > > Key: SPARK-24578 > URL: https://issues.apache.org/jira/browse/SPARK-24578 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.0, 2.3.1 >Reporter: Wenbo Zhao >Priority: Major > > After Spark 2.3, we observed lots of errors like the following in some of our > production job > {code:java} > 18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result > ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003, > chunkIndex=0}, > buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to > /172.22.18.7:60865; closing connection > java.io.IOException: Broken pipe > at sun.nio.ch.FileDispatcherImpl.write0(Native Method) > at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) > at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) > at sun.nio.ch.IOUtil.write(IOUtil.java:65) > at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) > at > org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156) > at > org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142) > at > org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123) > at > io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355) > at > io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224) > at > io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382) > at > io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934) > at > io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362) > at > io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901) > at > io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at > io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at > io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983) > at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248) > at > io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284) > at > io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) > at > io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403) > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463) > at >
[jira] [Commented] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue
[ https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16516609#comment-16516609 ] Wenbo Zhao commented on SPARK-24578: For now, we could reproduce this issue in completely different env and different distribution of Spark 2.3.0. We also observed that by setting spark.locality.wait to be a big number, e.g. 60s could help but this reduces the our system throughput significantly. > Reading remote cache block behavior changes and causes timeout issue > > > Key: SPARK-24578 > URL: https://issues.apache.org/jira/browse/SPARK-24578 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.0, 2.3.1 >Reporter: Wenbo Zhao >Priority: Major > > After Spark 2.3, we observed lots of errors like the following in some of our > production job > {code:java} > 18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result > ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003, > chunkIndex=0}, > buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to > /172.22.18.7:60865; closing connection > java.io.IOException: Broken pipe > at sun.nio.ch.FileDispatcherImpl.write0(Native Method) > at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) > at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) > at sun.nio.ch.IOUtil.write(IOUtil.java:65) > at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) > at > org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156) > at > org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142) > at > org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123) > at > io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355) > at > io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224) > at > io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382) > at > io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934) > at > io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362) > at > io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901) > at > io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at > io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at > io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983) > at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248) > at > io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284) > at > io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) > at > io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403) > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463) > at > io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) > at > io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138) > {code} > > Here is a small reproducible for a small cluster of 2 executors (say host-1 > and host-2) each with 8 cores. Here, the memory of driver and executors are > not an import factor here as long as it is big enough, say 20G. > {code:java} > val n = 1 > val df0 = sc.parallelize(1 to n).toDF > val df = df0.withColumn("x0", rand()).withColumn("x0", rand() > ).withColumn("x1", rand() > ).withColumn("x2", rand() > ).withColumn("x3", rand() > ).withColumn("x4", rand() > ).wit
[jira] [Commented] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue
[ https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16516602#comment-16516602 ] Wenbo Zhao commented on SPARK-24578: [~irashid] We didn't touch "spark.maxRemoteBlockSizeFetchToMem". You are right. After digging more details, I don't think that commit is relevant, > Reading remote cache block behavior changes and causes timeout issue > > > Key: SPARK-24578 > URL: https://issues.apache.org/jira/browse/SPARK-24578 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.0, 2.3.1 >Reporter: Wenbo Zhao >Priority: Major > > After Spark 2.3, we observed lots of errors like the following in some of our > production job > {code:java} > 18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result > ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003, > chunkIndex=0}, > buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to > /172.22.18.7:60865; closing connection > java.io.IOException: Broken pipe > at sun.nio.ch.FileDispatcherImpl.write0(Native Method) > at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) > at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) > at sun.nio.ch.IOUtil.write(IOUtil.java:65) > at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) > at > org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156) > at > org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142) > at > org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123) > at > io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355) > at > io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224) > at > io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382) > at > io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934) > at > io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362) > at > io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901) > at > io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at > io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at > io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983) > at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248) > at > io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284) > at > io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) > at > io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403) > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463) > at > io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) > at > io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138) > {code} > > Here is a small reproducible for a small cluster of 2 executors (say host-1 > and host-2) each with 8 cores. Here, the memory of driver and executors are > not an import factor here as long as it is big enough, say 20G. > {code:java} > val n = 1 > val df0 = sc.parallelize(1 to n).toDF > val df = df0.withColumn("x0", rand()).withColumn("x0", rand() > ).withColumn("x1", rand() > ).withColumn("x2", rand() > ).withColumn("x3", rand() > ).withColumn("x4", rand() > ).withColumn("x5", rand() > ).withColumn("x6", rand() > ).withColumn("x7", rand() > ).withColumn("x8", rand() > ).withCol
[jira] [Commented] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue
[ https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16516527#comment-16516527 ] Imran Rashid commented on SPARK-24578: -- [~wbzhao] [~icexelloss] you're saying this is *without* touching the value of "spark.maxRemoteBlockSizeFetchToMem", right? If you aren't turning on fetch-to-disk, then the behavior shouldn't change in 2.3.0 (though of course there may be bugs) cc [~jerryshao] [~attilapiros] > Reading remote cache block behavior changes and causes timeout issue > > > Key: SPARK-24578 > URL: https://issues.apache.org/jira/browse/SPARK-24578 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.0, 2.3.1 >Reporter: Wenbo Zhao >Priority: Major > > After Spark 2.3, we observed lots of errors like the following in some of our > production job > {code:java} > 18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result > ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003, > chunkIndex=0}, > buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to > /172.22.18.7:60865; closing connection > java.io.IOException: Broken pipe > at sun.nio.ch.FileDispatcherImpl.write0(Native Method) > at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) > at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) > at sun.nio.ch.IOUtil.write(IOUtil.java:65) > at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) > at > org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156) > at > org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142) > at > org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123) > at > io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355) > at > io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224) > at > io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382) > at > io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934) > at > io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362) > at > io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901) > at > io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at > io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at > io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983) > at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248) > at > io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284) > at > io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) > at > io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403) > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463) > at > io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) > at > io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138) > {code} > > Here is a small reproducible for a small cluster of 2 executors (say host-1 > and host-2) each with 8 cores. Here, the memory of driver and executors are > not an import factor here as long as it is big enough, say 20G. > {code:java} > val n = 1 > val df0 = sc.parallelize(1 to n).toDF > val df = df0.withColumn("x0", rand()).withColumn("x0", rand() > ).withColumn("x1", rand() > ).withColumn("x2", rand() > ).withColumn("x3", rand() > ).withColumn
[jira] [Commented] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue
[ https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16515879#comment-16515879 ] Li Jin commented on SPARK-24578: cc @gatorsmile We found this when switching from 2.2.1 to 2.3.0 in one of our applications. The implication is pretty bad - the time outs significantly hurt the performance (20s to several minutes for some jobs). This could affect other Spark 2.3 users too because it's pretty easy to reproduce. > Reading remote cache block behavior changes and causes timeout issue > > > Key: SPARK-24578 > URL: https://issues.apache.org/jira/browse/SPARK-24578 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.0, 2.3.1 >Reporter: Wenbo Zhao >Priority: Major > > After Spark 2.3, we observed lots of errors like the following in some of our > production job > {code:java} > 18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result > ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003, > chunkIndex=0}, > buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to > /172.22.18.7:60865; closing connection > java.io.IOException: Broken pipe > at sun.nio.ch.FileDispatcherImpl.write0(Native Method) > at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) > at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) > at sun.nio.ch.IOUtil.write(IOUtil.java:65) > at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) > at > org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156) > at > org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142) > at > org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123) > at > io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355) > at > io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224) > at > io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382) > at > io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934) > at > io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362) > at > io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901) > at > io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at > io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at > io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983) > at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248) > at > io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284) > at > io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) > at > io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403) > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463) > at > io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) > at > io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138) > {code} > > Here is a small reproducible for a small cluster of 2 executors (say host-1 > and host-2) each with 8 cores. Here, the memory of driver and executors are > not an import factor here as long as it is big enough, say 20G. > {code:java} > val n = 1 > val df0 = sc.parallelize(1 to n).toDF > val df = df0.withColumn("x0", rand()).withColumn("x0", rand() > ).withColumn("x1", rand() > ).withColumn("x2", rand() > ).withColumn("x3", rand() > ).wit
[jira] [Commented] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue
[ https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16515858#comment-16515858 ] Wenbo Zhao commented on SPARK-24578: An easier reproduciable cluster setting is 10 executors each with 2 cores and 15G memory. > Reading remote cache block behavior changes and causes timeout issue > > > Key: SPARK-24578 > URL: https://issues.apache.org/jira/browse/SPARK-24578 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.0, 2.3.1 >Reporter: Wenbo Zhao >Priority: Major > > After Spark 2.3, we observed lots of errors like the following in some of our > production job > {code:java} > 18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result > ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003, > chunkIndex=0}, > buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to > /172.22.18.7:60865; closing connection > java.io.IOException: Broken pipe > at sun.nio.ch.FileDispatcherImpl.write0(Native Method) > at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) > at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) > at sun.nio.ch.IOUtil.write(IOUtil.java:65) > at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) > at > org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156) > at > org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142) > at > org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123) > at > io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355) > at > io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224) > at > io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382) > at > io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934) > at > io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362) > at > io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901) > at > io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at > io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at > io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983) > at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248) > at > io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284) > at > io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) > at > io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403) > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463) > at > io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) > at > io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138) > {code} > > Here is a small reproducible for a small cluster of 2 executors (say host-1 > and host-2) each with 8 cores (the memory of driver and executors are not a > import factor here as long as it is big enough, say 20G). > {code:java} > val n = 1 > val df0 = sc.parallelize(1 to n).toDF > val df = df0.withColumn("x0", rand()).withColumn("x0", rand() > ).withColumn("x1", rand() > ).withColumn("x2", rand() > ).withColumn("x3", rand() > ).withColumn("x4", rand() > ).withColumn("x5", rand() > ).withColumn("x6", rand() > ).withColumn("x7", rand() > ).withColumn("x8", rand() > ).withColumn("x9", rand()) > df.cache; df.count > (1 to 10).toArray.par.m