[jira] [Updated] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue

2024-02-19 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated SPARK-24578:
---
Labels: pull-request-available  (was: )

> Reading remote cache block behavior changes and causes timeout issue
> 
>
> Key: SPARK-24578
> URL: https://issues.apache.org/jira/browse/SPARK-24578
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0, 2.3.1
>Reporter: Wenbo Zhao
>Assignee: Wenbo Zhao
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 2.3.2, 2.4.0
>
>
> After Spark 2.3, we observed lots of errors like the following in some of our 
> production job
> {code:java}
> 18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result 
> ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003, 
> chunkIndex=0}, 
> buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to 
> /172.22.18.7:60865; closing connection
> java.io.IOException: Broken pipe
> at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
> at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
> at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
> at sun.nio.ch.IOUtil.write(IOUtil.java:65)
> at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123)
> at 
> io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355)
> at 
> io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224)
> at 
> io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382)
> at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934)
> at 
> io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362)
> at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901)
> at 
> io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at 
> io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at 
> io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983)
> at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248)
> at 
> io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284)
> at 
> io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
> at 
> io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
> {code}
>  
> Here is a small reproducible for a small cluster of 2 executors (say host-1 
> and host-2) each with 8 cores. Here, the memory of driver and executors are 
> not an import factor here as long as it is big enough, say 20G. 
> {code:java}
> val n = 1
> val df0 = sc.parallelize(1 to n).toDF
> val df = df0.withColumn("x0", rand()).withColumn("x0", rand()
> ).withColumn("x1", rand()
> ).withColumn("x2", rand()
> ).withColumn("x3", rand()
> ).withColumn("x4", rand()
> ).withColumn("x5", rand()
> ).withColumn("x6", rand()
> ).withColumn("x7", rand()
> ).withColumn("x8", rand()
> ).withColumn("x9", rand())
> df.cache; 

[jira] [Updated] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue

2018-06-20 Thread Shixiong Zhu (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shixiong Zhu updated SPARK-24578:
-
Fix Version/s: 2.4.0

> Reading remote cache block behavior changes and causes timeout issue
> 
>
> Key: SPARK-24578
> URL: https://issues.apache.org/jira/browse/SPARK-24578
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0, 2.3.1
>Reporter: Wenbo Zhao
>Priority: Blocker
> Fix For: 2.3.2, 2.4.0
>
>
> After Spark 2.3, we observed lots of errors like the following in some of our 
> production job
> {code:java}
> 18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result 
> ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003, 
> chunkIndex=0}, 
> buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to 
> /172.22.18.7:60865; closing connection
> java.io.IOException: Broken pipe
> at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
> at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
> at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
> at sun.nio.ch.IOUtil.write(IOUtil.java:65)
> at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123)
> at 
> io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355)
> at 
> io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224)
> at 
> io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382)
> at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934)
> at 
> io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362)
> at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901)
> at 
> io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at 
> io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at 
> io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983)
> at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248)
> at 
> io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284)
> at 
> io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
> at 
> io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
> {code}
>  
> Here is a small reproducible for a small cluster of 2 executors (say host-1 
> and host-2) each with 8 cores. Here, the memory of driver and executors are 
> not an import factor here as long as it is big enough, say 20G. 
> {code:java}
> val n = 1
> val df0 = sc.parallelize(1 to n).toDF
> val df = df0.withColumn("x0", rand()).withColumn("x0", rand()
> ).withColumn("x1", rand()
> ).withColumn("x2", rand()
> ).withColumn("x3", rand()
> ).withColumn("x4", rand()
> ).withColumn("x5", rand()
> ).withColumn("x6", rand()
> ).withColumn("x7", rand()
> ).withColumn("x8", rand()
> ).withColumn("x9", rand())
> df.cache; df.count
> (1 to 10).toArray.par.map { i => println(i); 
> df.groupBy("x1").agg(count("value")).show() }
> 

[jira] [Updated] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue

2018-06-20 Thread Imran Rashid (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Imran Rashid updated SPARK-24578:
-
Priority: Blocker  (was: Major)

> Reading remote cache block behavior changes and causes timeout issue
> 
>
> Key: SPARK-24578
> URL: https://issues.apache.org/jira/browse/SPARK-24578
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0, 2.3.1
>Reporter: Wenbo Zhao
>Priority: Blocker
>
> After Spark 2.3, we observed lots of errors like the following in some of our 
> production job
> {code:java}
> 18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result 
> ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003, 
> chunkIndex=0}, 
> buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to 
> /172.22.18.7:60865; closing connection
> java.io.IOException: Broken pipe
> at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
> at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
> at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
> at sun.nio.ch.IOUtil.write(IOUtil.java:65)
> at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123)
> at 
> io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355)
> at 
> io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224)
> at 
> io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382)
> at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934)
> at 
> io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362)
> at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901)
> at 
> io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at 
> io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at 
> io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983)
> at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248)
> at 
> io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284)
> at 
> io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
> at 
> io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
> {code}
>  
> Here is a small reproducible for a small cluster of 2 executors (say host-1 
> and host-2) each with 8 cores. Here, the memory of driver and executors are 
> not an import factor here as long as it is big enough, say 20G. 
> {code:java}
> val n = 1
> val df0 = sc.parallelize(1 to n).toDF
> val df = df0.withColumn("x0", rand()).withColumn("x0", rand()
> ).withColumn("x1", rand()
> ).withColumn("x2", rand()
> ).withColumn("x3", rand()
> ).withColumn("x4", rand()
> ).withColumn("x5", rand()
> ).withColumn("x6", rand()
> ).withColumn("x7", rand()
> ).withColumn("x8", rand()
> ).withColumn("x9", rand())
> df.cache; df.count
> (1 to 10).toArray.par.map { i => println(i); 
> df.groupBy("x1").agg(count("value")).show() }
> {code}
>  
> In the above 

[jira] [Updated] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue

2018-06-20 Thread Imran Rashid (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Imran Rashid updated SPARK-24578:
-
Target Version/s: 2.3.2, 2.4.0

> Reading remote cache block behavior changes and causes timeout issue
> 
>
> Key: SPARK-24578
> URL: https://issues.apache.org/jira/browse/SPARK-24578
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0, 2.3.1
>Reporter: Wenbo Zhao
>Priority: Major
>
> After Spark 2.3, we observed lots of errors like the following in some of our 
> production job
> {code:java}
> 18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result 
> ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003, 
> chunkIndex=0}, 
> buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to 
> /172.22.18.7:60865; closing connection
> java.io.IOException: Broken pipe
> at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
> at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
> at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
> at sun.nio.ch.IOUtil.write(IOUtil.java:65)
> at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123)
> at 
> io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355)
> at 
> io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224)
> at 
> io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382)
> at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934)
> at 
> io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362)
> at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901)
> at 
> io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at 
> io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at 
> io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983)
> at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248)
> at 
> io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284)
> at 
> io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
> at 
> io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
> {code}
>  
> Here is a small reproducible for a small cluster of 2 executors (say host-1 
> and host-2) each with 8 cores. Here, the memory of driver and executors are 
> not an import factor here as long as it is big enough, say 20G. 
> {code:java}
> val n = 1
> val df0 = sc.parallelize(1 to n).toDF
> val df = df0.withColumn("x0", rand()).withColumn("x0", rand()
> ).withColumn("x1", rand()
> ).withColumn("x2", rand()
> ).withColumn("x3", rand()
> ).withColumn("x4", rand()
> ).withColumn("x5", rand()
> ).withColumn("x6", rand()
> ).withColumn("x7", rand()
> ).withColumn("x8", rand()
> ).withColumn("x9", rand())
> df.cache; df.count
> (1 to 10).toArray.par.map { i => println(i); 
> df.groupBy("x1").agg(count("value")).show() }
> {code}
>  
> In the above 

[jira] [Updated] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue

2018-06-18 Thread Wenbo Zhao (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenbo Zhao updated SPARK-24578:
---
Description: 
After Spark 2.3, we observed lots of errors like the following in some of our 
production job
{code:java}
18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result 
ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003, 
chunkIndex=0}, 
buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to 
/172.22.18.7:60865; closing connection
java.io.IOException: Broken pipe
at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
at sun.nio.ch.IOUtil.write(IOUtil.java:65)
at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
at 
org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156)
at 
org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142)
at 
org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123)
at 
io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355)
at 
io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224)
at 
io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382)
at 
io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934)
at 
io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362)
at 
io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901)
at 
io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
at 
io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
at 
io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
at 
io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
at 
io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
at 
io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983)
at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248)
at 
io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284)
at 
io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
at 
io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
at 
io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
at 
io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
{code}
 

Here is a small reproducible for a small cluster of 2 executors (say host-1 and 
host-2) each with 8 cores. Here, the memory of driver and executors are not an 
import factor here as long as it is big enough, say 20G. 
{code:java}
val n = 1
val df0 = sc.parallelize(1 to n).toDF
val df = df0.withColumn("x0", rand()).withColumn("x0", rand()
).withColumn("x1", rand()
).withColumn("x2", rand()
).withColumn("x3", rand()
).withColumn("x4", rand()
).withColumn("x5", rand()
).withColumn("x6", rand()
).withColumn("x7", rand()
).withColumn("x8", rand()
).withColumn("x9", rand())

df.cache; df.count

(1 to 10).toArray.par.map { i => println(i); 
df.groupBy("x1").agg(count("value")).show() }
{code}
 

In the above example, we generate a random DataFrame of size around 7G; cache 
it and then perform a parallel DataFrame operations by using `array.par.map`. 
Because of the parallel computation, with high possibility, some task could be 
scheduled to a host-2 where it needs to read the cache block data from host-1. 
This follows the code path of 
[https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L691]
 and then tries to transfer a big block (~ 500MB) of cache block from host-1 to 
host-2. Often, this big transfer makes the cluster suffer time out issue (it 
will retry 3 times, each with 120s 

[jira] [Updated] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue

2018-06-18 Thread Wenbo Zhao (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenbo Zhao updated SPARK-24578:
---
Description: 
After Spark 2.3, we observed lots of errors like the following in some of our 
production job
{code:java}
18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result 
ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003, 
chunkIndex=0}, 
buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to 
/172.22.18.7:60865; closing connection
java.io.IOException: Broken pipe
at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
at sun.nio.ch.IOUtil.write(IOUtil.java:65)
at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
at 
org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156)
at 
org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142)
at 
org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123)
at 
io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355)
at 
io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224)
at 
io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382)
at 
io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934)
at 
io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362)
at 
io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901)
at 
io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
at 
io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
at 
io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
at 
io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
at 
io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
at 
io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983)
at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248)
at 
io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284)
at 
io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
at 
io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
at 
io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
at 
io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
{code}
 

Here is a small reproducible for a small cluster of 2 executors (say host-1 and 
host-2) each with 8 cores. Here, the memory of driver and executors are not an 
import factor here as long as it is big enough, say 20G. 
{code:java}
val n = 1
val df0 = sc.parallelize(1 to n).toDF
val df = df0.withColumn("x0", rand()).withColumn("x0", rand()
).withColumn("x1", rand()
).withColumn("x2", rand()
).withColumn("x3", rand()
).withColumn("x4", rand()
).withColumn("x5", rand()
).withColumn("x6", rand()
).withColumn("x7", rand()
).withColumn("x8", rand()
).withColumn("x9", rand())

df.cache; df.count

(1 to 10).toArray.par.map { i => println(i); 
df.groupBy("x1").agg(count("value")).show() }
{code}
 

In the above example, we generate a random DataFrame of size around 7G; cache 
it and then perform a parallel DataFrame operations by using `array.par.map`. 
Because of the parallel computation, with high possibility, some task could be 
scheduled to a host-2 where it needs to read the cache block data from host-1. 
This follows the code path of 
[https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L691]
 and then tries to transfer a big block (~ 500MB) of cache block from host-1 to 
host-2. Often, this big transfer makes the cluster suffer time out issue (it 
will retry 3 times, each with 120s 

[jira] [Updated] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue

2018-06-18 Thread Li Jin (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Li Jin updated SPARK-24578:
---
Component/s: (was: Input/Output)
 Spark Core

> Reading remote cache block behavior changes and causes timeout issue
> 
>
> Key: SPARK-24578
> URL: https://issues.apache.org/jira/browse/SPARK-24578
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0, 2.3.1
>Reporter: Wenbo Zhao
>Priority: Major
>
> After Spark 2.3, we observed lots of errors like the following in some of our 
> production job
> {code:java}
> 18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result 
> ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003, 
> chunkIndex=0}, 
> buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to 
> /172.22.18.7:60865; closing connection
> java.io.IOException: Broken pipe
> at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
> at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
> at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
> at sun.nio.ch.IOUtil.write(IOUtil.java:65)
> at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123)
> at 
> io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355)
> at 
> io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224)
> at 
> io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382)
> at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934)
> at 
> io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362)
> at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901)
> at 
> io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at 
> io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at 
> io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983)
> at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248)
> at 
> io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284)
> at 
> io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
> at 
> io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
> {code}
>  
> Here is a small reproducible for a small cluster of 2 executors (say host-1 
> and host-2) each with 8 cores (the memory of driver and executors are not a 
> import factor here as long as it is big enough, say 20G).
> {code:java}
> val n = 1
> val df0 = sc.parallelize(1 to n).toDF
> val df = df0.withColumn("x0", rand()).withColumn("x0", rand()
> ).withColumn("x1", rand()
> ).withColumn("x2", rand()
> ).withColumn("x3", rand()
> ).withColumn("x4", rand()
> ).withColumn("x5", rand()
> ).withColumn("x6", rand()
> ).withColumn("x7", rand()
> ).withColumn("x8", rand()
> ).withColumn("x9", rand())
> df.cache; df.count
> (1 to 10).toArray.par.map { i => println(i); 
> df.groupBy("x1").agg(count("value")).show() }
> {code}
>  
> 

[jira] [Updated] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue

2018-06-18 Thread Wenbo Zhao (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenbo Zhao updated SPARK-24578:
---
Description: 
After Spark 2.3, we observed lots of errors like the following in some of our 
production job
{code:java}
18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result 
ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003, 
chunkIndex=0}, 
buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to 
/172.22.18.7:60865; closing connection
java.io.IOException: Broken pipe
at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
at sun.nio.ch.IOUtil.write(IOUtil.java:65)
at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
at 
org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156)
at 
org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142)
at 
org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123)
at 
io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355)
at 
io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224)
at 
io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382)
at 
io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934)
at 
io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362)
at 
io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901)
at 
io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
at 
io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
at 
io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
at 
io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
at 
io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
at 
io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983)
at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248)
at 
io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284)
at 
io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
at 
io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
at 
io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
at 
io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
{code}
 

Here is a small reproducible for a small cluster of 2 executors (say host-1 and 
host-2) each with 8 cores (the memory of driver and executors are not a import 
factor here as long as it is big enough, say 20G).
{code:java}
val n = 1
val df0 = sc.parallelize(1 to n).toDF
val df = df0.withColumn("x0", rand()).withColumn("x0", rand()
).withColumn("x1", rand()
).withColumn("x2", rand()
).withColumn("x3", rand()
).withColumn("x4", rand()
).withColumn("x5", rand()
).withColumn("x6", rand()
).withColumn("x7", rand()
).withColumn("x8", rand()
).withColumn("x9", rand())

df.cache; df.count

(1 to 10).toArray.par.map { i => println(i); 
df.groupBy("x1").agg(count("value")).show() }
{code}
 

In the above example, we generated a random DataFrame of size around 7G; cache 
it and then did a parallel DataFrame operations by using `array.par.map`. 
Because of the parallel computation, with high possibility, some task will be 
scheduled to a host-2 where it needs to read the cache block data from host-1. 
This will follow the code path of 
[https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L691]
 then try to transfer a big block (~ 600MB) of cache block from host-1 to 
host-2. Often, this big transfer made the cluster suffer time out issue (it 
will retry 3 times, each with 120s timeout, and then 

[jira] [Updated] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue

2018-06-18 Thread Wenbo Zhao (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenbo Zhao updated SPARK-24578:
---
Description: 
After Spark 2.3, we observed lots of errors like the following
{code:java}
18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result 
ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003, 
chunkIndex=0}, 
buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to 
/172.22.18.7:60865; closing connection
java.io.IOException: Broken pipe
at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
at sun.nio.ch.IOUtil.write(IOUtil.java:65)
at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
at 
org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156)
at 
org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142)
at 
org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123)
at 
io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355)
at 
io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224)
at 
io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382)
at 
io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934)
at 
io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362)
at 
io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901)
at 
io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
at 
io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
at 
io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
at 
io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
at 
io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
at 
io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983)
at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248)
at 
io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284)
at 
io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
at 
io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
at 
io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
at 
io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
{code}
 

Here is a small reproducible for a small cluster of 2 executors (say host-1 and 
host-2) each with 8 cores (the memory of driver and executors are not a import 
factor here as long as it is big enough, say 20G).
{code:java}
val n = 1
val df0 = sc.parallelize(1 to n).toDF
val df = df0.withColumn("x0", rand()).withColumn("x0", rand()
).withColumn("x1", rand()
).withColumn("x2", rand()
).withColumn("x3", rand()
).withColumn("x4", rand()
).withColumn("x5", rand()
).withColumn("x6", rand()
).withColumn("x7", rand()
).withColumn("x8", rand()
).withColumn("x9", rand())

df.cache; df.count

(1 to 10).toArray.par.map { i => println(i); 
df.groupBy("x1").agg(count("value")).show() }
{code}
 

In the above example, we generated a random DataFrame of size around 7G; cache 
it and then did a parallel DataFrame operations by using `array.par.map`. 
Because of the parallel computation, with high possibility, some task will be 
scheduled to a host-2 where it needs to read the cache block data from host-1. 
This will follow the code path of 
[https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L691]
 then try to transfer a big block (~ 600MB) of cache block from host-1 to 
host-2. Often, this big transfer made the cluster suffer time out issue (it 
will retry 3 times, each with 120s timeout, and then do recompute to put the 
cache 

[jira] [Updated] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue

2018-06-18 Thread Wenbo Zhao (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenbo Zhao updated SPARK-24578:
---
Description: 
After Spark 2.3, we observed lots of errors like the following

 
{code:java}
18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result 
ChunkFetchSuccess{streamChunkId=StreamChunkId

{streamId=91672904003, chunkIndex=0}

, buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to 
/172.22.18.7:60865; closing connection
java.io.IOException: Broken pipe
at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
at sun.nio.ch.IOUtil.write(IOUtil.java:65)
at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
at 
org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156)
at 
org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142)
at 
org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123)
at 
io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355)
at 
io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224)
at 
io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382)
at 
io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934)
at 
io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362)
at 
io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901)
at 
io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
at 
io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
at 
io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
at 
io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
at 
io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
at 
io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983)
at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248)
at 
io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284)
at 
io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
at 
io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
at 
io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
at 
io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
{code}
Here is a small reproducible for a small cluster of 2 executors (say host-1 and 
host-2) each with 8 cores (the memory of driver and executors are not a import 
factor here as long as it is big enough, say 10G).

 

 
{code:java}
val n = 1
val df0 = sc.parallelize(1 to n).toDF
val df = df0.withColumn("x0", rand()).withColumn("x0", rand()
).withColumn("x1", rand()
).withColumn("x2", rand()
).withColumn("x3", rand()
).withColumn("x4", rand()
).withColumn("x5", rand()
).withColumn("x6", rand()
).withColumn("x7", rand()
).withColumn("x8", rand()
).withColumn("x9", rand())

df.cache; df.count

(1 to 10).toArray.par.map { i => println(i); 
df.groupBy("x1").agg(count("value")).show() }
{code}
 

In the above example, we generated a random DataFrame of size around 7G; cache 
it and then did a parallel DataFrame operations by using `array.par.map`. 
Because of the parallel computation, with high possibility, some task will be 
scheduled to a host-2 where the task needs to read the cache block data from 
host-1. This will follow the code path of 
[https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L691]
 then try to transfer a big block (~ 600MB) of cache block from host-1 to 
host-2. Often, this big transfer made the cluster suffer time out issue. 

We couldn't to reproduce the same issue in Spark 2.2.1. From the log of