[jira] [Updated] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue
[ https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated SPARK-24578: --- Labels: pull-request-available (was: ) > Reading remote cache block behavior changes and causes timeout issue > > > Key: SPARK-24578 > URL: https://issues.apache.org/jira/browse/SPARK-24578 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.0, 2.3.1 >Reporter: Wenbo Zhao >Assignee: Wenbo Zhao >Priority: Blocker > Labels: pull-request-available > Fix For: 2.3.2, 2.4.0 > > > After Spark 2.3, we observed lots of errors like the following in some of our > production job > {code:java} > 18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result > ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003, > chunkIndex=0}, > buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to > /172.22.18.7:60865; closing connection > java.io.IOException: Broken pipe > at sun.nio.ch.FileDispatcherImpl.write0(Native Method) > at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) > at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) > at sun.nio.ch.IOUtil.write(IOUtil.java:65) > at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) > at > org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156) > at > org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142) > at > org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123) > at > io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355) > at > io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224) > at > io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382) > at > io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934) > at > io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362) > at > io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901) > at > io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at > io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at > io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983) > at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248) > at > io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284) > at > io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) > at > io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403) > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463) > at > io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) > at > io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138) > {code} > > Here is a small reproducible for a small cluster of 2 executors (say host-1 > and host-2) each with 8 cores. Here, the memory of driver and executors are > not an import factor here as long as it is big enough, say 20G. > {code:java} > val n = 1 > val df0 = sc.parallelize(1 to n).toDF > val df = df0.withColumn("x0", rand()).withColumn("x0", rand() > ).withColumn("x1", rand() > ).withColumn("x2", rand() > ).withColumn("x3", rand() > ).withColumn("x4", rand() > ).withColumn("x5", rand() > ).withColumn("x6", rand() > ).withColumn("x7", rand() > ).withColumn("x8", rand() > ).withColumn("x9", rand()) > df.cache; df.c
[jira] [Updated] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue
[ https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu updated SPARK-24578: - Fix Version/s: 2.4.0 > Reading remote cache block behavior changes and causes timeout issue > > > Key: SPARK-24578 > URL: https://issues.apache.org/jira/browse/SPARK-24578 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.0, 2.3.1 >Reporter: Wenbo Zhao >Priority: Blocker > Fix For: 2.3.2, 2.4.0 > > > After Spark 2.3, we observed lots of errors like the following in some of our > production job > {code:java} > 18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result > ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003, > chunkIndex=0}, > buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to > /172.22.18.7:60865; closing connection > java.io.IOException: Broken pipe > at sun.nio.ch.FileDispatcherImpl.write0(Native Method) > at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) > at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) > at sun.nio.ch.IOUtil.write(IOUtil.java:65) > at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) > at > org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156) > at > org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142) > at > org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123) > at > io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355) > at > io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224) > at > io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382) > at > io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934) > at > io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362) > at > io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901) > at > io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at > io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at > io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983) > at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248) > at > io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284) > at > io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) > at > io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403) > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463) > at > io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) > at > io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138) > {code} > > Here is a small reproducible for a small cluster of 2 executors (say host-1 > and host-2) each with 8 cores. Here, the memory of driver and executors are > not an import factor here as long as it is big enough, say 20G. > {code:java} > val n = 1 > val df0 = sc.parallelize(1 to n).toDF > val df = df0.withColumn("x0", rand()).withColumn("x0", rand() > ).withColumn("x1", rand() > ).withColumn("x2", rand() > ).withColumn("x3", rand() > ).withColumn("x4", rand() > ).withColumn("x5", rand() > ).withColumn("x6", rand() > ).withColumn("x7", rand() > ).withColumn("x8", rand() > ).withColumn("x9", rand()) > df.cache; df.count > (1 to 10).toArray.par.map { i => println(i); > df.groupBy("x1").agg(count("value")).show() } >
[jira] [Updated] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue
[ https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Imran Rashid updated SPARK-24578: - Priority: Blocker (was: Major) > Reading remote cache block behavior changes and causes timeout issue > > > Key: SPARK-24578 > URL: https://issues.apache.org/jira/browse/SPARK-24578 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.0, 2.3.1 >Reporter: Wenbo Zhao >Priority: Blocker > > After Spark 2.3, we observed lots of errors like the following in some of our > production job > {code:java} > 18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result > ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003, > chunkIndex=0}, > buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to > /172.22.18.7:60865; closing connection > java.io.IOException: Broken pipe > at sun.nio.ch.FileDispatcherImpl.write0(Native Method) > at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) > at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) > at sun.nio.ch.IOUtil.write(IOUtil.java:65) > at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) > at > org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156) > at > org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142) > at > org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123) > at > io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355) > at > io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224) > at > io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382) > at > io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934) > at > io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362) > at > io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901) > at > io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at > io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at > io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983) > at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248) > at > io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284) > at > io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) > at > io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403) > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463) > at > io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) > at > io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138) > {code} > > Here is a small reproducible for a small cluster of 2 executors (say host-1 > and host-2) each with 8 cores. Here, the memory of driver and executors are > not an import factor here as long as it is big enough, say 20G. > {code:java} > val n = 1 > val df0 = sc.parallelize(1 to n).toDF > val df = df0.withColumn("x0", rand()).withColumn("x0", rand() > ).withColumn("x1", rand() > ).withColumn("x2", rand() > ).withColumn("x3", rand() > ).withColumn("x4", rand() > ).withColumn("x5", rand() > ).withColumn("x6", rand() > ).withColumn("x7", rand() > ).withColumn("x8", rand() > ).withColumn("x9", rand()) > df.cache; df.count > (1 to 10).toArray.par.map { i => println(i); > df.groupBy("x1").agg(count("value")).show() } > {code} > > In the above e
[jira] [Updated] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue
[ https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Imran Rashid updated SPARK-24578: - Target Version/s: 2.3.2, 2.4.0 > Reading remote cache block behavior changes and causes timeout issue > > > Key: SPARK-24578 > URL: https://issues.apache.org/jira/browse/SPARK-24578 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.0, 2.3.1 >Reporter: Wenbo Zhao >Priority: Major > > After Spark 2.3, we observed lots of errors like the following in some of our > production job > {code:java} > 18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result > ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003, > chunkIndex=0}, > buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to > /172.22.18.7:60865; closing connection > java.io.IOException: Broken pipe > at sun.nio.ch.FileDispatcherImpl.write0(Native Method) > at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) > at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) > at sun.nio.ch.IOUtil.write(IOUtil.java:65) > at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) > at > org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156) > at > org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142) > at > org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123) > at > io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355) > at > io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224) > at > io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382) > at > io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934) > at > io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362) > at > io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901) > at > io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at > io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at > io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983) > at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248) > at > io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284) > at > io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) > at > io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403) > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463) > at > io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) > at > io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138) > {code} > > Here is a small reproducible for a small cluster of 2 executors (say host-1 > and host-2) each with 8 cores. Here, the memory of driver and executors are > not an import factor here as long as it is big enough, say 20G. > {code:java} > val n = 1 > val df0 = sc.parallelize(1 to n).toDF > val df = df0.withColumn("x0", rand()).withColumn("x0", rand() > ).withColumn("x1", rand() > ).withColumn("x2", rand() > ).withColumn("x3", rand() > ).withColumn("x4", rand() > ).withColumn("x5", rand() > ).withColumn("x6", rand() > ).withColumn("x7", rand() > ).withColumn("x8", rand() > ).withColumn("x9", rand()) > df.cache; df.count > (1 to 10).toArray.par.map { i => println(i); > df.groupBy("x1").agg(count("value")).show() } > {code} > > In the above exam
[jira] [Updated] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue
[ https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenbo Zhao updated SPARK-24578: --- Description: After Spark 2.3, we observed lots of errors like the following in some of our production job {code:java} 18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003, chunkIndex=0}, buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to /172.22.18.7:60865; closing connection java.io.IOException: Broken pipe at sun.nio.ch.FileDispatcherImpl.write0(Native Method) at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) at sun.nio.ch.IOUtil.write(IOUtil.java:65) at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) at org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156) at org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142) at org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123) at io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355) at io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224) at io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382) at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934) at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362) at io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901) at io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321) at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) at io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115) at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117) at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) at io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983) at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248) at io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284) at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463) at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138) {code} Here is a small reproducible for a small cluster of 2 executors (say host-1 and host-2) each with 8 cores. Here, the memory of driver and executors are not an import factor here as long as it is big enough, say 20G. {code:java} val n = 1 val df0 = sc.parallelize(1 to n).toDF val df = df0.withColumn("x0", rand()).withColumn("x0", rand() ).withColumn("x1", rand() ).withColumn("x2", rand() ).withColumn("x3", rand() ).withColumn("x4", rand() ).withColumn("x5", rand() ).withColumn("x6", rand() ).withColumn("x7", rand() ).withColumn("x8", rand() ).withColumn("x9", rand()) df.cache; df.count (1 to 10).toArray.par.map { i => println(i); df.groupBy("x1").agg(count("value")).show() } {code} In the above example, we generate a random DataFrame of size around 7G; cache it and then perform a parallel DataFrame operations by using `array.par.map`. Because of the parallel computation, with high possibility, some task could be scheduled to a host-2 where it needs to read the cache block data from host-1. This follows the code path of [https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L691] and then tries to transfer a big block (~ 500MB) of cache block from host-1 to host-2. Often, this big transfer makes the cluster suffer time out issue (it will retry 3 times, each with 120s timeou
[jira] [Updated] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue
[ https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenbo Zhao updated SPARK-24578: --- Description: After Spark 2.3, we observed lots of errors like the following in some of our production job {code:java} 18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003, chunkIndex=0}, buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to /172.22.18.7:60865; closing connection java.io.IOException: Broken pipe at sun.nio.ch.FileDispatcherImpl.write0(Native Method) at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) at sun.nio.ch.IOUtil.write(IOUtil.java:65) at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) at org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156) at org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142) at org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123) at io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355) at io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224) at io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382) at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934) at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362) at io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901) at io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321) at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) at io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115) at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117) at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) at io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983) at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248) at io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284) at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463) at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138) {code} Here is a small reproducible for a small cluster of 2 executors (say host-1 and host-2) each with 8 cores. Here, the memory of driver and executors are not an import factor here as long as it is big enough, say 20G. {code:java} val n = 1 val df0 = sc.parallelize(1 to n).toDF val df = df0.withColumn("x0", rand()).withColumn("x0", rand() ).withColumn("x1", rand() ).withColumn("x2", rand() ).withColumn("x3", rand() ).withColumn("x4", rand() ).withColumn("x5", rand() ).withColumn("x6", rand() ).withColumn("x7", rand() ).withColumn("x8", rand() ).withColumn("x9", rand()) df.cache; df.count (1 to 10).toArray.par.map { i => println(i); df.groupBy("x1").agg(count("value")).show() } {code} In the above example, we generate a random DataFrame of size around 7G; cache it and then perform a parallel DataFrame operations by using `array.par.map`. Because of the parallel computation, with high possibility, some task could be scheduled to a host-2 where it needs to read the cache block data from host-1. This follows the code path of [https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L691] and then tries to transfer a big block (~ 500MB) of cache block from host-1 to host-2. Often, this big transfer makes the cluster suffer time out issue (it will retry 3 times, each with 120s timeou
[jira] [Updated] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue
[ https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Li Jin updated SPARK-24578: --- Component/s: (was: Input/Output) Spark Core > Reading remote cache block behavior changes and causes timeout issue > > > Key: SPARK-24578 > URL: https://issues.apache.org/jira/browse/SPARK-24578 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.0, 2.3.1 >Reporter: Wenbo Zhao >Priority: Major > > After Spark 2.3, we observed lots of errors like the following in some of our > production job > {code:java} > 18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result > ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003, > chunkIndex=0}, > buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to > /172.22.18.7:60865; closing connection > java.io.IOException: Broken pipe > at sun.nio.ch.FileDispatcherImpl.write0(Native Method) > at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) > at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) > at sun.nio.ch.IOUtil.write(IOUtil.java:65) > at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) > at > org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156) > at > org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142) > at > org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123) > at > io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355) > at > io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224) > at > io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382) > at > io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934) > at > io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362) > at > io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901) > at > io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at > io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at > io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983) > at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248) > at > io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284) > at > io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) > at > io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403) > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463) > at > io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) > at > io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138) > {code} > > Here is a small reproducible for a small cluster of 2 executors (say host-1 > and host-2) each with 8 cores (the memory of driver and executors are not a > import factor here as long as it is big enough, say 20G). > {code:java} > val n = 1 > val df0 = sc.parallelize(1 to n).toDF > val df = df0.withColumn("x0", rand()).withColumn("x0", rand() > ).withColumn("x1", rand() > ).withColumn("x2", rand() > ).withColumn("x3", rand() > ).withColumn("x4", rand() > ).withColumn("x5", rand() > ).withColumn("x6", rand() > ).withColumn("x7", rand() > ).withColumn("x8", rand() > ).withColumn("x9", rand()) > df.cache; df.count > (1 to 10).toArray.par.map { i => println(i); > df.groupBy("x1").agg(count("value")).show() } > {code} > > In
[jira] [Updated] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue
[ https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenbo Zhao updated SPARK-24578: --- Description: After Spark 2.3, we observed lots of errors like the following in some of our production job {code:java} 18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003, chunkIndex=0}, buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to /172.22.18.7:60865; closing connection java.io.IOException: Broken pipe at sun.nio.ch.FileDispatcherImpl.write0(Native Method) at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) at sun.nio.ch.IOUtil.write(IOUtil.java:65) at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) at org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156) at org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142) at org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123) at io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355) at io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224) at io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382) at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934) at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362) at io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901) at io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321) at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) at io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115) at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117) at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) at io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983) at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248) at io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284) at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463) at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138) {code} Here is a small reproducible for a small cluster of 2 executors (say host-1 and host-2) each with 8 cores (the memory of driver and executors are not a import factor here as long as it is big enough, say 20G). {code:java} val n = 1 val df0 = sc.parallelize(1 to n).toDF val df = df0.withColumn("x0", rand()).withColumn("x0", rand() ).withColumn("x1", rand() ).withColumn("x2", rand() ).withColumn("x3", rand() ).withColumn("x4", rand() ).withColumn("x5", rand() ).withColumn("x6", rand() ).withColumn("x7", rand() ).withColumn("x8", rand() ).withColumn("x9", rand()) df.cache; df.count (1 to 10).toArray.par.map { i => println(i); df.groupBy("x1").agg(count("value")).show() } {code} In the above example, we generated a random DataFrame of size around 7G; cache it and then did a parallel DataFrame operations by using `array.par.map`. Because of the parallel computation, with high possibility, some task will be scheduled to a host-2 where it needs to read the cache block data from host-1. This will follow the code path of [https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L691] then try to transfer a big block (~ 600MB) of cache block from host-1 to host-2. Often, this big transfer made the cluster suffer time out issue (it will retry 3 times, each with 120s timeout, and then do
[jira] [Updated] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue
[ https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenbo Zhao updated SPARK-24578: --- Description: After Spark 2.3, we observed lots of errors like the following {code:java} 18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003, chunkIndex=0}, buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to /172.22.18.7:60865; closing connection java.io.IOException: Broken pipe at sun.nio.ch.FileDispatcherImpl.write0(Native Method) at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) at sun.nio.ch.IOUtil.write(IOUtil.java:65) at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) at org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156) at org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142) at org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123) at io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355) at io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224) at io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382) at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934) at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362) at io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901) at io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321) at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) at io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115) at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117) at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) at io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983) at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248) at io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284) at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463) at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138) {code} Here is a small reproducible for a small cluster of 2 executors (say host-1 and host-2) each with 8 cores (the memory of driver and executors are not a import factor here as long as it is big enough, say 20G). {code:java} val n = 1 val df0 = sc.parallelize(1 to n).toDF val df = df0.withColumn("x0", rand()).withColumn("x0", rand() ).withColumn("x1", rand() ).withColumn("x2", rand() ).withColumn("x3", rand() ).withColumn("x4", rand() ).withColumn("x5", rand() ).withColumn("x6", rand() ).withColumn("x7", rand() ).withColumn("x8", rand() ).withColumn("x9", rand()) df.cache; df.count (1 to 10).toArray.par.map { i => println(i); df.groupBy("x1").agg(count("value")).show() } {code} In the above example, we generated a random DataFrame of size around 7G; cache it and then did a parallel DataFrame operations by using `array.par.map`. Because of the parallel computation, with high possibility, some task will be scheduled to a host-2 where it needs to read the cache block data from host-1. This will follow the code path of [https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L691] then try to transfer a big block (~ 600MB) of cache block from host-1 to host-2. Often, this big transfer made the cluster suffer time out issue (it will retry 3 times, each with 120s timeout, and then do recompute to put the cache bl
[jira] [Updated] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue
[ https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenbo Zhao updated SPARK-24578: --- Description: After Spark 2.3, we observed lots of errors like the following {code:java} 18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result ChunkFetchSuccess{streamChunkId=StreamChunkId {streamId=91672904003, chunkIndex=0} , buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to /172.22.18.7:60865; closing connection java.io.IOException: Broken pipe at sun.nio.ch.FileDispatcherImpl.write0(Native Method) at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) at sun.nio.ch.IOUtil.write(IOUtil.java:65) at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) at org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156) at org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142) at org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123) at io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355) at io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224) at io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382) at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934) at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362) at io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901) at io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321) at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) at io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115) at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117) at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) at io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983) at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248) at io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284) at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463) at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138) {code} Here is a small reproducible for a small cluster of 2 executors (say host-1 and host-2) each with 8 cores (the memory of driver and executors are not a import factor here as long as it is big enough, say 10G). {code:java} val n = 1 val df0 = sc.parallelize(1 to n).toDF val df = df0.withColumn("x0", rand()).withColumn("x0", rand() ).withColumn("x1", rand() ).withColumn("x2", rand() ).withColumn("x3", rand() ).withColumn("x4", rand() ).withColumn("x5", rand() ).withColumn("x6", rand() ).withColumn("x7", rand() ).withColumn("x8", rand() ).withColumn("x9", rand()) df.cache; df.count (1 to 10).toArray.par.map { i => println(i); df.groupBy("x1").agg(count("value")).show() } {code} In the above example, we generated a random DataFrame of size around 7G; cache it and then did a parallel DataFrame operations by using `array.par.map`. Because of the parallel computation, with high possibility, some task will be scheduled to a host-2 where the task needs to read the cache block data from host-1. This will follow the code path of [https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L691] then try to transfer a big block (~ 600MB) of cache block from host-1 to host-2. Often, this big transfer made the cluster suffer time out issue. We couldn't to reproduce the same issue in Spark 2.2.1. From the log of Sp