[jira] [Comment Edited] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue
[ https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16517532#comment-16517532 ] Wenbo Zhao edited comment on SPARK-24578 at 6/19/18 8:55 PM: - woop, [~attilapiros], sorry, I didn't know you have created a PR. was (Author: wbzhao): woop, [~attilapiros], sorry, I didn't you have created a PR. > Reading remote cache block behavior changes and causes timeout issue > > > Key: SPARK-24578 > URL: https://issues.apache.org/jira/browse/SPARK-24578 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.0, 2.3.1 >Reporter: Wenbo Zhao >Priority: Major > > After Spark 2.3, we observed lots of errors like the following in some of our > production job > {code:java} > 18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result > ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003, > chunkIndex=0}, > buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to > /172.22.18.7:60865; closing connection > java.io.IOException: Broken pipe > at sun.nio.ch.FileDispatcherImpl.write0(Native Method) > at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) > at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) > at sun.nio.ch.IOUtil.write(IOUtil.java:65) > at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) > at > org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156) > at > org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142) > at > org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123) > at > io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355) > at > io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224) > at > io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382) > at > io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934) > at > io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362) > at > io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901) > at > io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at > io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at > io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983) > at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248) > at > io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284) > at > io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) > at > io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403) > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463) > at > io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) > at > io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138) > {code} > > Here is a small reproducible for a small cluster of 2 executors (say host-1 > and host-2) each with 8 cores. Here, the memory of driver and executors are > not an import factor here as long as it is big enough, say 20G. > {code:java} > val n = 1 > val df0 = sc.parallelize(1 to n).toDF > val df = df0.withColumn("x0", rand()).withColumn("x0", rand() > ).withColumn("x1", rand() > ).withColumn("x2", rand() > ).withColumn("x3", rand() > ).withColumn("x4", rand() > ).withColumn("x5", rand() > ).withColumn("x6", rand() > ).withColum
[jira] [Commented] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue
[ https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16517532#comment-16517532 ] Wenbo Zhao commented on SPARK-24578: woop, [~attilapiros], sorry, I didn't you have created a PR. > Reading remote cache block behavior changes and causes timeout issue > > > Key: SPARK-24578 > URL: https://issues.apache.org/jira/browse/SPARK-24578 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.0, 2.3.1 >Reporter: Wenbo Zhao >Priority: Major > > After Spark 2.3, we observed lots of errors like the following in some of our > production job > {code:java} > 18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result > ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003, > chunkIndex=0}, > buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to > /172.22.18.7:60865; closing connection > java.io.IOException: Broken pipe > at sun.nio.ch.FileDispatcherImpl.write0(Native Method) > at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) > at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) > at sun.nio.ch.IOUtil.write(IOUtil.java:65) > at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) > at > org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156) > at > org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142) > at > org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123) > at > io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355) > at > io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224) > at > io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382) > at > io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934) > at > io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362) > at > io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901) > at > io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at > io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at > io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983) > at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248) > at > io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284) > at > io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) > at > io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403) > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463) > at > io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) > at > io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138) > {code} > > Here is a small reproducible for a small cluster of 2 executors (say host-1 > and host-2) each with 8 cores. Here, the memory of driver and executors are > not an import factor here as long as it is big enough, say 20G. > {code:java} > val n = 1 > val df0 = sc.parallelize(1 to n).toDF > val df = df0.withColumn("x0", rand()).withColumn("x0", rand() > ).withColumn("x1", rand() > ).withColumn("x2", rand() > ).withColumn("x3", rand() > ).withColumn("x4", rand() > ).withColumn("x5", rand() > ).withColumn("x6", rand() > ).withColumn("x7", rand() > ).withColumn("x8", rand() > ).withColumn("x9", rand()) > df.cache; df.count > (1 to 10).toArray.par.map { i => println(i);
[jira] [Commented] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue
[ https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16517465#comment-16517465 ] Wenbo Zhao commented on SPARK-24578: [~attilapiros] if don't mind, I could create a PR for it :) > Reading remote cache block behavior changes and causes timeout issue > > > Key: SPARK-24578 > URL: https://issues.apache.org/jira/browse/SPARK-24578 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.0, 2.3.1 >Reporter: Wenbo Zhao >Priority: Major > > After Spark 2.3, we observed lots of errors like the following in some of our > production job > {code:java} > 18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result > ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003, > chunkIndex=0}, > buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to > /172.22.18.7:60865; closing connection > java.io.IOException: Broken pipe > at sun.nio.ch.FileDispatcherImpl.write0(Native Method) > at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) > at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) > at sun.nio.ch.IOUtil.write(IOUtil.java:65) > at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) > at > org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156) > at > org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142) > at > org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123) > at > io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355) > at > io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224) > at > io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382) > at > io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934) > at > io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362) > at > io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901) > at > io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at > io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at > io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983) > at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248) > at > io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284) > at > io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) > at > io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403) > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463) > at > io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) > at > io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138) > {code} > > Here is a small reproducible for a small cluster of 2 executors (say host-1 > and host-2) each with 8 cores. Here, the memory of driver and executors are > not an import factor here as long as it is big enough, say 20G. > {code:java} > val n = 1 > val df0 = sc.parallelize(1 to n).toDF > val df = df0.withColumn("x0", rand()).withColumn("x0", rand() > ).withColumn("x1", rand() > ).withColumn("x2", rand() > ).withColumn("x3", rand() > ).withColumn("x4", rand() > ).withColumn("x5", rand() > ).withColumn("x6", rand() > ).withColumn("x7", rand() > ).withColumn("x8", rand() > ).withColumn("x9", rand()) > df.cache; df.count > (1 to 10).toArray.par.map { i => println(i); >
[jira] [Commented] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue
[ https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16517458#comment-16517458 ] Wenbo Zhao commented on SPARK-24578: Hi [~vanzin]. the commit [https://github.com/apache/spark/commit/16612638f0539f197eb7deb1be2ec53fed60d707#diff-a1f697fbd4610dff4eb8ff848e8cea2a] is only included since 2.3.0. If I understand right, the root cause is that we don't do `consolidateIfNeeded` anymore for many small chunks which causes the buf.notBuffer() has bad performance in the case that we have to call `copyByteBuf()` many times. In my example, the many times = 500MB / 256 KB. [~attilapiros] I applied {code:java} ByteBuffer buffer = buf.nioBuffer(buf.readerIndex(), Math.min(buf.readableBytes(), NIO_BUFFER_LIMIT));{code} in our spark distribution and ran through the test code in this jira. This works fine. > Reading remote cache block behavior changes and causes timeout issue > > > Key: SPARK-24578 > URL: https://issues.apache.org/jira/browse/SPARK-24578 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.0, 2.3.1 >Reporter: Wenbo Zhao >Priority: Major > > After Spark 2.3, we observed lots of errors like the following in some of our > production job > {code:java} > 18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result > ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003, > chunkIndex=0}, > buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to > /172.22.18.7:60865; closing connection > java.io.IOException: Broken pipe > at sun.nio.ch.FileDispatcherImpl.write0(Native Method) > at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) > at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) > at sun.nio.ch.IOUtil.write(IOUtil.java:65) > at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) > at > org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156) > at > org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142) > at > org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123) > at > io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355) > at > io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224) > at > io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382) > at > io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934) > at > io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362) > at > io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901) > at > io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at > io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at > io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983) > at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248) > at > io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284) > at > io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) > at > io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403) > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463) > at > io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) > at > io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138) > {code} > > Here is a small reproducible for a small cluster
[jira] [Commented] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue
[ https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16517441#comment-16517441 ] Wenbo Zhao commented on SPARK-24578: [~irashid] Yes, that is exactly what I saw in our side. > Reading remote cache block behavior changes and causes timeout issue > > > Key: SPARK-24578 > URL: https://issues.apache.org/jira/browse/SPARK-24578 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.0, 2.3.1 >Reporter: Wenbo Zhao >Priority: Major > > After Spark 2.3, we observed lots of errors like the following in some of our > production job > {code:java} > 18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result > ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003, > chunkIndex=0}, > buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to > /172.22.18.7:60865; closing connection > java.io.IOException: Broken pipe > at sun.nio.ch.FileDispatcherImpl.write0(Native Method) > at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) > at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) > at sun.nio.ch.IOUtil.write(IOUtil.java:65) > at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) > at > org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156) > at > org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142) > at > org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123) > at > io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355) > at > io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224) > at > io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382) > at > io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934) > at > io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362) > at > io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901) > at > io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at > io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at > io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983) > at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248) > at > io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284) > at > io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) > at > io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403) > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463) > at > io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) > at > io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138) > {code} > > Here is a small reproducible for a small cluster of 2 executors (say host-1 > and host-2) each with 8 cores. Here, the memory of driver and executors are > not an import factor here as long as it is big enough, say 20G. > {code:java} > val n = 1 > val df0 = sc.parallelize(1 to n).toDF > val df = df0.withColumn("x0", rand()).withColumn("x0", rand() > ).withColumn("x1", rand() > ).withColumn("x2", rand() > ).withColumn("x3", rand() > ).withColumn("x4", rand() > ).withColumn("x5", rand() > ).withColumn("x6", rand() > ).withColumn("x7", rand() > ).withColumn("x8", rand() > ).withColumn("x9", rand()) > df.cache; df.count > (1 to 10).toArray.par.map { i => println(i); > df.
[jira] [Commented] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue
[ https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16517440#comment-16517440 ] Wenbo Zhao commented on SPARK-24578: Hi [~attilapiros], I guess what you suggest is {code:java} ByteBuffer buffer = buf.nioBuffer(buf.readerIndex(), Math.min(buf.readableBytes(), NIO_BUFFER_LIMIT));{code} Will report back how my test goes. > Reading remote cache block behavior changes and causes timeout issue > > > Key: SPARK-24578 > URL: https://issues.apache.org/jira/browse/SPARK-24578 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.0, 2.3.1 >Reporter: Wenbo Zhao >Priority: Major > > After Spark 2.3, we observed lots of errors like the following in some of our > production job > {code:java} > 18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result > ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003, > chunkIndex=0}, > buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to > /172.22.18.7:60865; closing connection > java.io.IOException: Broken pipe > at sun.nio.ch.FileDispatcherImpl.write0(Native Method) > at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) > at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) > at sun.nio.ch.IOUtil.write(IOUtil.java:65) > at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) > at > org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156) > at > org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142) > at > org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123) > at > io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355) > at > io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224) > at > io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382) > at > io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934) > at > io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362) > at > io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901) > at > io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at > io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at > io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983) > at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248) > at > io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284) > at > io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) > at > io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403) > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463) > at > io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) > at > io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138) > {code} > > Here is a small reproducible for a small cluster of 2 executors (say host-1 > and host-2) each with 8 cores. Here, the memory of driver and executors are > not an import factor here as long as it is big enough, say 20G. > {code:java} > val n = 1 > val df0 = sc.parallelize(1 to n).toDF > val df = df0.withColumn("x0", rand()).withColumn("x0", rand() > ).withColumn("x1", rand() > ).withColumn("x2", rand() > ).withColumn("x3", rand() > ).withColumn("x4", rand() > ).withColumn("x5", rand() > ).withColumn("x6", rand() > ).withCo
[jira] [Commented] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue
[ https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16517180#comment-16517180 ] Wenbo Zhao commented on SPARK-24578: After digging more details, this commit [https://github.com/apache/spark/commit/16612638f0539f197eb7deb1be2ec53fed60d707#diff-a1f697fbd4610dff4eb8ff848e8cea2a] came to my attention. I reverted this commit in our Spark distribution and found out the issue went away. I also examined the log to verify that performance of [https://github.com/apache/spark/blob/v2.3.0/common/network-common/src/main/java/org/apache/spark/network/protocol/MessageWithHeader.java#L142] is back to normal. It is able to transfer the 500MB data in 2s~3s. However, it is still not quite clear why that commit is causing the issue. > Reading remote cache block behavior changes and causes timeout issue > > > Key: SPARK-24578 > URL: https://issues.apache.org/jira/browse/SPARK-24578 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.0, 2.3.1 >Reporter: Wenbo Zhao >Priority: Major > > After Spark 2.3, we observed lots of errors like the following in some of our > production job > {code:java} > 18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result > ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003, > chunkIndex=0}, > buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to > /172.22.18.7:60865; closing connection > java.io.IOException: Broken pipe > at sun.nio.ch.FileDispatcherImpl.write0(Native Method) > at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) > at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) > at sun.nio.ch.IOUtil.write(IOUtil.java:65) > at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) > at > org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156) > at > org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142) > at > org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123) > at > io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355) > at > io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224) > at > io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382) > at > io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934) > at > io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362) > at > io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901) > at > io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at > io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at > io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983) > at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248) > at > io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284) > at > io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) > at > io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403) > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463) > at > io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) > at > io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138) > {code} > > Here is a small reproducible for a small cluster of 2 executors (say host-1 > and host-2) each with 8 cores. Here, the memory of
[jira] [Commented] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue
[ https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16516621#comment-16516621 ] Wenbo Zhao commented on SPARK-24578: Hi [~irashid], many thanks for clarifying my questions. I tried to compare the logs between Spark 2.2.1 and 2.3.0 to see if there something interesting but out of luck so far. I added a few logging to benchmark how much time we need to transfer ~500MB data in the [https://github.com/apache/spark/blob/v2.3.0/common/network-common/src/main/java/org/apache/spark/network/protocol/MessageWithHeader.java#L142]. In Spark 2.2.1, it only took <= 2s, but in Spark 2.3.0, it is very slow and never got finish with 120s and thus timeout. Are you able to reproduce the issue? > Reading remote cache block behavior changes and causes timeout issue > > > Key: SPARK-24578 > URL: https://issues.apache.org/jira/browse/SPARK-24578 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.0, 2.3.1 >Reporter: Wenbo Zhao >Priority: Major > > After Spark 2.3, we observed lots of errors like the following in some of our > production job > {code:java} > 18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result > ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003, > chunkIndex=0}, > buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to > /172.22.18.7:60865; closing connection > java.io.IOException: Broken pipe > at sun.nio.ch.FileDispatcherImpl.write0(Native Method) > at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) > at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) > at sun.nio.ch.IOUtil.write(IOUtil.java:65) > at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) > at > org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156) > at > org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142) > at > org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123) > at > io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355) > at > io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224) > at > io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382) > at > io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934) > at > io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362) > at > io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901) > at > io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at > io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at > io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983) > at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248) > at > io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284) > at > io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) > at > io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403) > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463) > at > io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) > at > io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138) > {code} > > Here is a small reproducible for a small cluster of 2 executors (say host-1 > and host-2) each with 8 cores. Here, the memory of driver and executors are > not an impo
[jira] [Commented] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue
[ https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16516609#comment-16516609 ] Wenbo Zhao commented on SPARK-24578: For now, we could reproduce this issue in completely different env and different distribution of Spark 2.3.0. We also observed that by setting spark.locality.wait to be a big number, e.g. 60s could help but this reduces the our system throughput significantly. > Reading remote cache block behavior changes and causes timeout issue > > > Key: SPARK-24578 > URL: https://issues.apache.org/jira/browse/SPARK-24578 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.0, 2.3.1 >Reporter: Wenbo Zhao >Priority: Major > > After Spark 2.3, we observed lots of errors like the following in some of our > production job > {code:java} > 18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result > ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003, > chunkIndex=0}, > buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to > /172.22.18.7:60865; closing connection > java.io.IOException: Broken pipe > at sun.nio.ch.FileDispatcherImpl.write0(Native Method) > at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) > at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) > at sun.nio.ch.IOUtil.write(IOUtil.java:65) > at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) > at > org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156) > at > org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142) > at > org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123) > at > io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355) > at > io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224) > at > io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382) > at > io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934) > at > io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362) > at > io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901) > at > io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at > io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at > io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983) > at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248) > at > io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284) > at > io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) > at > io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403) > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463) > at > io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) > at > io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138) > {code} > > Here is a small reproducible for a small cluster of 2 executors (say host-1 > and host-2) each with 8 cores. Here, the memory of driver and executors are > not an import factor here as long as it is big enough, say 20G. > {code:java} > val n = 1 > val df0 = sc.parallelize(1 to n).toDF > val df = df0.withColumn("x0", rand()).withColumn("x0", rand() > ).withColumn("x1", rand() > ).withColumn("x2", rand() > ).withColumn("x3", rand() > ).withColumn("x4", rand() > ).wit
[jira] [Updated] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue
[ https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenbo Zhao updated SPARK-24578: --- Description: After Spark 2.3, we observed lots of errors like the following in some of our production job {code:java} 18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003, chunkIndex=0}, buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to /172.22.18.7:60865; closing connection java.io.IOException: Broken pipe at sun.nio.ch.FileDispatcherImpl.write0(Native Method) at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) at sun.nio.ch.IOUtil.write(IOUtil.java:65) at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) at org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156) at org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142) at org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123) at io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355) at io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224) at io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382) at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934) at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362) at io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901) at io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321) at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) at io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115) at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117) at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) at io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983) at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248) at io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284) at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463) at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138) {code} Here is a small reproducible for a small cluster of 2 executors (say host-1 and host-2) each with 8 cores. Here, the memory of driver and executors are not an import factor here as long as it is big enough, say 20G. {code:java} val n = 1 val df0 = sc.parallelize(1 to n).toDF val df = df0.withColumn("x0", rand()).withColumn("x0", rand() ).withColumn("x1", rand() ).withColumn("x2", rand() ).withColumn("x3", rand() ).withColumn("x4", rand() ).withColumn("x5", rand() ).withColumn("x6", rand() ).withColumn("x7", rand() ).withColumn("x8", rand() ).withColumn("x9", rand()) df.cache; df.count (1 to 10).toArray.par.map { i => println(i); df.groupBy("x1").agg(count("value")).show() } {code} In the above example, we generate a random DataFrame of size around 7G; cache it and then perform a parallel DataFrame operations by using `array.par.map`. Because of the parallel computation, with high possibility, some task could be scheduled to a host-2 where it needs to read the cache block data from host-1. This follows the code path of [https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L691] and then tries to transfer a big block (~ 500MB) of cache block from host-1 to host-2. Often, this big transfer makes the cluster suffer time out issue (it will retry 3 times, each with 120s timeou
[jira] [Commented] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue
[ https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16516602#comment-16516602 ] Wenbo Zhao commented on SPARK-24578: [~irashid] We didn't touch "spark.maxRemoteBlockSizeFetchToMem". You are right. After digging more details, I don't think that commit is relevant, > Reading remote cache block behavior changes and causes timeout issue > > > Key: SPARK-24578 > URL: https://issues.apache.org/jira/browse/SPARK-24578 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.0, 2.3.1 >Reporter: Wenbo Zhao >Priority: Major > > After Spark 2.3, we observed lots of errors like the following in some of our > production job > {code:java} > 18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result > ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003, > chunkIndex=0}, > buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to > /172.22.18.7:60865; closing connection > java.io.IOException: Broken pipe > at sun.nio.ch.FileDispatcherImpl.write0(Native Method) > at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) > at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) > at sun.nio.ch.IOUtil.write(IOUtil.java:65) > at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) > at > org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156) > at > org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142) > at > org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123) > at > io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355) > at > io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224) > at > io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382) > at > io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934) > at > io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362) > at > io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901) > at > io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at > io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at > io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983) > at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248) > at > io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284) > at > io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) > at > io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403) > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463) > at > io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) > at > io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138) > {code} > > Here is a small reproducible for a small cluster of 2 executors (say host-1 > and host-2) each with 8 cores. Here, the memory of driver and executors are > not an import factor here as long as it is big enough, say 20G. > {code:java} > val n = 1 > val df0 = sc.parallelize(1 to n).toDF > val df = df0.withColumn("x0", rand()).withColumn("x0", rand() > ).withColumn("x1", rand() > ).withColumn("x2", rand() > ).withColumn("x3", rand() > ).withColumn("x4", rand() > ).withColumn("x5", rand() > ).withColumn("x6", rand() > ).withColumn("x7", rand() > ).withColumn("x8", rand() > ).withCol
[jira] [Updated] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue
[ https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenbo Zhao updated SPARK-24578: --- Description: After Spark 2.3, we observed lots of errors like the following in some of our production job {code:java} 18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003, chunkIndex=0}, buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to /172.22.18.7:60865; closing connection java.io.IOException: Broken pipe at sun.nio.ch.FileDispatcherImpl.write0(Native Method) at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) at sun.nio.ch.IOUtil.write(IOUtil.java:65) at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) at org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156) at org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142) at org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123) at io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355) at io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224) at io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382) at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934) at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362) at io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901) at io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321) at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) at io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115) at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117) at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) at io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983) at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248) at io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284) at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463) at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138) {code} Here is a small reproducible for a small cluster of 2 executors (say host-1 and host-2) each with 8 cores. Here, the memory of driver and executors are not an import factor here as long as it is big enough, say 20G. {code:java} val n = 1 val df0 = sc.parallelize(1 to n).toDF val df = df0.withColumn("x0", rand()).withColumn("x0", rand() ).withColumn("x1", rand() ).withColumn("x2", rand() ).withColumn("x3", rand() ).withColumn("x4", rand() ).withColumn("x5", rand() ).withColumn("x6", rand() ).withColumn("x7", rand() ).withColumn("x8", rand() ).withColumn("x9", rand()) df.cache; df.count (1 to 10).toArray.par.map { i => println(i); df.groupBy("x1").agg(count("value")).show() } {code} In the above example, we generate a random DataFrame of size around 7G; cache it and then perform a parallel DataFrame operations by using `array.par.map`. Because of the parallel computation, with high possibility, some task could be scheduled to a host-2 where it needs to read the cache block data from host-1. This follows the code path of [https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L691] and then tries to transfer a big block (~ 500MB) of cache block from host-1 to host-2. Often, this big transfer makes the cluster suffer time out issue (it will retry 3 times, each with 120s timeou
[jira] [Commented] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue
[ https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16515858#comment-16515858 ] Wenbo Zhao commented on SPARK-24578: An easier reproduciable cluster setting is 10 executors each with 2 cores and 15G memory. > Reading remote cache block behavior changes and causes timeout issue > > > Key: SPARK-24578 > URL: https://issues.apache.org/jira/browse/SPARK-24578 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.0, 2.3.1 >Reporter: Wenbo Zhao >Priority: Major > > After Spark 2.3, we observed lots of errors like the following in some of our > production job > {code:java} > 18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result > ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003, > chunkIndex=0}, > buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to > /172.22.18.7:60865; closing connection > java.io.IOException: Broken pipe > at sun.nio.ch.FileDispatcherImpl.write0(Native Method) > at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) > at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) > at sun.nio.ch.IOUtil.write(IOUtil.java:65) > at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) > at > org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156) > at > org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142) > at > org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123) > at > io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355) > at > io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224) > at > io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382) > at > io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934) > at > io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362) > at > io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901) > at > io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at > io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at > io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983) > at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248) > at > io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284) > at > io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) > at > io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403) > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463) > at > io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) > at > io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138) > {code} > > Here is a small reproducible for a small cluster of 2 executors (say host-1 > and host-2) each with 8 cores (the memory of driver and executors are not a > import factor here as long as it is big enough, say 20G). > {code:java} > val n = 1 > val df0 = sc.parallelize(1 to n).toDF > val df = df0.withColumn("x0", rand()).withColumn("x0", rand() > ).withColumn("x1", rand() > ).withColumn("x2", rand() > ).withColumn("x3", rand() > ).withColumn("x4", rand() > ).withColumn("x5", rand() > ).withColumn("x6", rand() > ).withColumn("x7", rand() > ).withColumn("x8", rand() > ).withColumn("x9", rand()) > df.cache; df.count > (1 to 10).toArray.par.m
[jira] [Updated] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue
[ https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenbo Zhao updated SPARK-24578: --- Description: After Spark 2.3, we observed lots of errors like the following in some of our production job {code:java} 18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003, chunkIndex=0}, buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to /172.22.18.7:60865; closing connection java.io.IOException: Broken pipe at sun.nio.ch.FileDispatcherImpl.write0(Native Method) at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) at sun.nio.ch.IOUtil.write(IOUtil.java:65) at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) at org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156) at org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142) at org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123) at io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355) at io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224) at io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382) at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934) at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362) at io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901) at io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321) at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) at io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115) at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117) at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) at io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983) at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248) at io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284) at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463) at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138) {code} Here is a small reproducible for a small cluster of 2 executors (say host-1 and host-2) each with 8 cores (the memory of driver and executors are not a import factor here as long as it is big enough, say 20G). {code:java} val n = 1 val df0 = sc.parallelize(1 to n).toDF val df = df0.withColumn("x0", rand()).withColumn("x0", rand() ).withColumn("x1", rand() ).withColumn("x2", rand() ).withColumn("x3", rand() ).withColumn("x4", rand() ).withColumn("x5", rand() ).withColumn("x6", rand() ).withColumn("x7", rand() ).withColumn("x8", rand() ).withColumn("x9", rand()) df.cache; df.count (1 to 10).toArray.par.map { i => println(i); df.groupBy("x1").agg(count("value")).show() } {code} In the above example, we generated a random DataFrame of size around 7G; cache it and then did a parallel DataFrame operations by using `array.par.map`. Because of the parallel computation, with high possibility, some task will be scheduled to a host-2 where it needs to read the cache block data from host-1. This will follow the code path of [https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L691] then try to transfer a big block (~ 600MB) of cache block from host-1 to host-2. Often, this big transfer made the cluster suffer time out issue (it will retry 3 times, each with 120s timeout, and then do
[jira] [Updated] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue
[ https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenbo Zhao updated SPARK-24578: --- Description: After Spark 2.3, we observed lots of errors like the following {code:java} 18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003, chunkIndex=0}, buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to /172.22.18.7:60865; closing connection java.io.IOException: Broken pipe at sun.nio.ch.FileDispatcherImpl.write0(Native Method) at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) at sun.nio.ch.IOUtil.write(IOUtil.java:65) at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) at org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156) at org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142) at org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123) at io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355) at io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224) at io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382) at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934) at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362) at io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901) at io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321) at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) at io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115) at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117) at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) at io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983) at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248) at io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284) at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463) at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138) {code} Here is a small reproducible for a small cluster of 2 executors (say host-1 and host-2) each with 8 cores (the memory of driver and executors are not a import factor here as long as it is big enough, say 20G). {code:java} val n = 1 val df0 = sc.parallelize(1 to n).toDF val df = df0.withColumn("x0", rand()).withColumn("x0", rand() ).withColumn("x1", rand() ).withColumn("x2", rand() ).withColumn("x3", rand() ).withColumn("x4", rand() ).withColumn("x5", rand() ).withColumn("x6", rand() ).withColumn("x7", rand() ).withColumn("x8", rand() ).withColumn("x9", rand()) df.cache; df.count (1 to 10).toArray.par.map { i => println(i); df.groupBy("x1").agg(count("value")).show() } {code} In the above example, we generated a random DataFrame of size around 7G; cache it and then did a parallel DataFrame operations by using `array.par.map`. Because of the parallel computation, with high possibility, some task will be scheduled to a host-2 where it needs to read the cache block data from host-1. This will follow the code path of [https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L691] then try to transfer a big block (~ 600MB) of cache block from host-1 to host-2. Often, this big transfer made the cluster suffer time out issue (it will retry 3 times, each with 120s timeout, and then do recompute to put the cache bl
[jira] [Updated] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue
[ https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenbo Zhao updated SPARK-24578: --- Description: After Spark 2.3, we observed lots of errors like the following {code:java} 18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result ChunkFetchSuccess{streamChunkId=StreamChunkId {streamId=91672904003, chunkIndex=0} , buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to /172.22.18.7:60865; closing connection java.io.IOException: Broken pipe at sun.nio.ch.FileDispatcherImpl.write0(Native Method) at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) at sun.nio.ch.IOUtil.write(IOUtil.java:65) at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) at org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156) at org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142) at org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123) at io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355) at io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224) at io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382) at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934) at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362) at io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901) at io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321) at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) at io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115) at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117) at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) at io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983) at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248) at io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284) at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463) at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138) {code} Here is a small reproducible for a small cluster of 2 executors (say host-1 and host-2) each with 8 cores (the memory of driver and executors are not a import factor here as long as it is big enough, say 10G). {code:java} val n = 1 val df0 = sc.parallelize(1 to n).toDF val df = df0.withColumn("x0", rand()).withColumn("x0", rand() ).withColumn("x1", rand() ).withColumn("x2", rand() ).withColumn("x3", rand() ).withColumn("x4", rand() ).withColumn("x5", rand() ).withColumn("x6", rand() ).withColumn("x7", rand() ).withColumn("x8", rand() ).withColumn("x9", rand()) df.cache; df.count (1 to 10).toArray.par.map { i => println(i); df.groupBy("x1").agg(count("value")).show() } {code} In the above example, we generated a random DataFrame of size around 7G; cache it and then did a parallel DataFrame operations by using `array.par.map`. Because of the parallel computation, with high possibility, some task will be scheduled to a host-2 where the task needs to read the cache block data from host-1. This will follow the code path of [https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L691] then try to transfer a big block (~ 600MB) of cache block from host-1 to host-2. Often, this big transfer made the cluster suffer time out issue. We couldn't to reproduce the same issue in Spark 2.2.1. From the log of Sp
[jira] [Created] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue
Wenbo Zhao created SPARK-24578: -- Summary: Reading remote cache block behavior changes and causes timeout issue Key: SPARK-24578 URL: https://issues.apache.org/jira/browse/SPARK-24578 Project: Spark Issue Type: Bug Components: Input/Output Affects Versions: 2.3.1, 2.3.0 Reporter: Wenbo Zhao After Spark 2.3, we observed lots of errors like the following 18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result ChunkFetchSuccess\{streamChunkId=StreamChunkId{streamId=91672904003, chunkIndex=0}, buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to /172.22.18.7:60865; closing connection java.io.IOException: Broken pipe at sun.nio.ch.FileDispatcherImpl.write0(Native Method) at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) at sun.nio.ch.IOUtil.write(IOUtil.java:65) at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) at org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156) at org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142) at org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123) at io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355) at io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224) at io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382) at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934) at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362) at io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901) at io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321) at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) at io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115) at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117) at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) at io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983) at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248) at io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284) at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463) at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138) Here is a small reproducible for a small cluster of 2 executors each with 8 cores (the memory of driver and executors are not a import factor here as long as it is big enough, say 10G). val n = 1 val df0 = sc.parallelize(1 to n).toDF val df = df0.withColumn("x0", rand()).withColumn("x0", rand() ).withColumn("x1", rand() ).withColumn("x2", rand() ).withColumn("x3", rand() ).withColumn("x4", rand() ).withColumn("x5", rand() ).withColumn("x6", rand() ).withColumn("x7", rand() ).withColumn("x8", rand() ).withColumn("x9", rand()) df.cache; df.count (1 to 10).toArray.par.map { i => println(i); df.groupBy("x1").agg(count("value")).show() } In the above example, we generated a random DataFrame of size around 7G; cache it and then did a parallel Dataframe operations by using `array.par.map`. Because of the parallel computation, with high possibility, some task will be scheduled to a host-2 where the task needs to
[jira] [Commented] (SPARK-24373) "df.cache() df.count()" no longer eagerly caches data when the analyzed plans are different after re-analyzing the plans
[ https://issues.apache.org/jira/browse/SPARK-24373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16493569#comment-16493569 ] Wenbo Zhao commented on SPARK-24373: [~mgaido] Thanks. I didn't look the comment carefully. > "df.cache() df.count()" no longer eagerly caches data when the analyzed plans > are different after re-analyzing the plans > > > Key: SPARK-24373 > URL: https://issues.apache.org/jira/browse/SPARK-24373 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0 >Reporter: Wenbo Zhao >Assignee: Marco Gaido >Priority: Blocker > Fix For: 2.3.1, 2.4.0 > > > Here is the code to reproduce in local mode > {code:java} > scala> val df = sc.range(1, 2).toDF > df: org.apache.spark.sql.DataFrame = [value: bigint] > scala> val myudf = udf({x: Long => println(""); x + 1}) > myudf: org.apache.spark.sql.expressions.UserDefinedFunction = > UserDefinedFunction(,LongType,Some(List(LongType))) > scala> val df1 = df.withColumn("value1", myudf(col("value"))) > df1: org.apache.spark.sql.DataFrame = [value: bigint, value1: bigint] > scala> df1.cache > res0: df1.type = [value: bigint, value1: bigint] > scala> df1.count > res1: Long = 1 > scala> df1.count > res2: Long = 1 > scala> df1.count > res3: Long = 1 > {code} > > in Spark 2.2, you could see it prints "". > In the above example, when you do explain. You could see > {code:java} > scala> df1.explain(true) > == Parsed Logical Plan == > 'Project [value#2L, UDF('value) AS value1#5] > +- AnalysisBarrier > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Analyzed Logical Plan == > value: bigint, value1: bigint > Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS value1#5L] > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Optimized Logical Plan == > InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, > memory, deserialized, 1 replicas) > +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L] > +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] > +- Scan ExternalRDDScan[obj#1L] > == Physical Plan == > *(1) InMemoryTableScan [value#2L, value1#5L] > +- InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, > memory, deserialized, 1 replicas) > +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L] > +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] > +- Scan ExternalRDDScan[obj#1L] > {code} > but the ImMemoryTableScan is mising in the following explain() > {code:java} > scala> df1.groupBy().count().explain(true) > == Parsed Logical Plan == > Aggregate [count(1) AS count#170L] > +- Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS > value1#5L] > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Analyzed Logical Plan == > count: bigint > Aggregate [count(1) AS count#170L] > +- Project [value#2L, if (isnull(value#2L)) null else if (isnull(value#2L)) > null else UDF(value#2L) AS value1#5L] > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Optimized Logical Plan == > Aggregate [count(1) AS count#170L] > +- Project > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Physical Plan == > *(2) HashAggregate(keys=[], functions=[count(1)], output=[count#170L]) > +- Exchange SinglePartition > +- *(1) HashAggregate(keys=[], functions=[partial_count(1)], > output=[count#175L]) > +- *(1) Project > +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] > +- Scan ExternalRDDScan[obj#1L] > {code} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Issue Comment Deleted] (SPARK-24373) "df.cache() df.count()" no longer eagerly caches data when the analyzed plans are different after re-analyzing the plans
[ https://issues.apache.org/jira/browse/SPARK-24373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenbo Zhao updated SPARK-24373: --- Comment: was deleted (was: Same question as [~icexelloss]. Also, any plan to make your fix into a more complete status, e.g. also fix the {{flatMapGroupsInPandas}} ? ) > "df.cache() df.count()" no longer eagerly caches data when the analyzed plans > are different after re-analyzing the plans > > > Key: SPARK-24373 > URL: https://issues.apache.org/jira/browse/SPARK-24373 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0 >Reporter: Wenbo Zhao >Assignee: Marco Gaido >Priority: Blocker > Fix For: 2.3.1, 2.4.0 > > > Here is the code to reproduce in local mode > {code:java} > scala> val df = sc.range(1, 2).toDF > df: org.apache.spark.sql.DataFrame = [value: bigint] > scala> val myudf = udf({x: Long => println(""); x + 1}) > myudf: org.apache.spark.sql.expressions.UserDefinedFunction = > UserDefinedFunction(,LongType,Some(List(LongType))) > scala> val df1 = df.withColumn("value1", myudf(col("value"))) > df1: org.apache.spark.sql.DataFrame = [value: bigint, value1: bigint] > scala> df1.cache > res0: df1.type = [value: bigint, value1: bigint] > scala> df1.count > res1: Long = 1 > scala> df1.count > res2: Long = 1 > scala> df1.count > res3: Long = 1 > {code} > > in Spark 2.2, you could see it prints "". > In the above example, when you do explain. You could see > {code:java} > scala> df1.explain(true) > == Parsed Logical Plan == > 'Project [value#2L, UDF('value) AS value1#5] > +- AnalysisBarrier > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Analyzed Logical Plan == > value: bigint, value1: bigint > Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS value1#5L] > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Optimized Logical Plan == > InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, > memory, deserialized, 1 replicas) > +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L] > +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] > +- Scan ExternalRDDScan[obj#1L] > == Physical Plan == > *(1) InMemoryTableScan [value#2L, value1#5L] > +- InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, > memory, deserialized, 1 replicas) > +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L] > +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] > +- Scan ExternalRDDScan[obj#1L] > {code} > but the ImMemoryTableScan is mising in the following explain() > {code:java} > scala> df1.groupBy().count().explain(true) > == Parsed Logical Plan == > Aggregate [count(1) AS count#170L] > +- Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS > value1#5L] > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Analyzed Logical Plan == > count: bigint > Aggregate [count(1) AS count#170L] > +- Project [value#2L, if (isnull(value#2L)) null else if (isnull(value#2L)) > null else UDF(value#2L) AS value1#5L] > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Optimized Logical Plan == > Aggregate [count(1) AS count#170L] > +- Project > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Physical Plan == > *(2) HashAggregate(keys=[], functions=[count(1)], output=[count#170L]) > +- Exchange SinglePartition > +- *(1) HashAggregate(keys=[], functions=[partial_count(1)], > output=[count#175L]) > +- *(1) Project > +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] > +- Scan ExternalRDDScan[obj#1L] > {code} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24373) "df.cache() df.count()" no longer eagerly caches data when the analyzed plans are different after re-analyzing the plans
[ https://issues.apache.org/jira/browse/SPARK-24373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16493545#comment-16493545 ] Wenbo Zhao commented on SPARK-24373: Same question as [~icexelloss]. Also, any plan to make your fix into a more complete status, e.g. also fix the {{flatMapGroupsInPandas}} ? > "df.cache() df.count()" no longer eagerly caches data when the analyzed plans > are different after re-analyzing the plans > > > Key: SPARK-24373 > URL: https://issues.apache.org/jira/browse/SPARK-24373 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0 >Reporter: Wenbo Zhao >Assignee: Marco Gaido >Priority: Blocker > Fix For: 2.3.1, 2.4.0 > > > Here is the code to reproduce in local mode > {code:java} > scala> val df = sc.range(1, 2).toDF > df: org.apache.spark.sql.DataFrame = [value: bigint] > scala> val myudf = udf({x: Long => println(""); x + 1}) > myudf: org.apache.spark.sql.expressions.UserDefinedFunction = > UserDefinedFunction(,LongType,Some(List(LongType))) > scala> val df1 = df.withColumn("value1", myudf(col("value"))) > df1: org.apache.spark.sql.DataFrame = [value: bigint, value1: bigint] > scala> df1.cache > res0: df1.type = [value: bigint, value1: bigint] > scala> df1.count > res1: Long = 1 > scala> df1.count > res2: Long = 1 > scala> df1.count > res3: Long = 1 > {code} > > in Spark 2.2, you could see it prints "". > In the above example, when you do explain. You could see > {code:java} > scala> df1.explain(true) > == Parsed Logical Plan == > 'Project [value#2L, UDF('value) AS value1#5] > +- AnalysisBarrier > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Analyzed Logical Plan == > value: bigint, value1: bigint > Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS value1#5L] > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Optimized Logical Plan == > InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, > memory, deserialized, 1 replicas) > +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L] > +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] > +- Scan ExternalRDDScan[obj#1L] > == Physical Plan == > *(1) InMemoryTableScan [value#2L, value1#5L] > +- InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, > memory, deserialized, 1 replicas) > +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L] > +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] > +- Scan ExternalRDDScan[obj#1L] > {code} > but the ImMemoryTableScan is mising in the following explain() > {code:java} > scala> df1.groupBy().count().explain(true) > == Parsed Logical Plan == > Aggregate [count(1) AS count#170L] > +- Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS > value1#5L] > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Analyzed Logical Plan == > count: bigint > Aggregate [count(1) AS count#170L] > +- Project [value#2L, if (isnull(value#2L)) null else if (isnull(value#2L)) > null else UDF(value#2L) AS value1#5L] > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Optimized Logical Plan == > Aggregate [count(1) AS count#170L] > +- Project > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Physical Plan == > *(2) HashAggregate(keys=[], functions=[count(1)], output=[count#170L]) > +- Exchange SinglePartition > +- *(1) HashAggregate(keys=[], functions=[partial_count(1)], > output=[count#175L]) > +- *(1) Project > +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] > +- Scan ExternalRDDScan[obj#1L] > {code} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24373) "df.cache() df.count()" no longer eagerly caches data
[ https://issues.apache.org/jira/browse/SPARK-24373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16489820#comment-16489820 ] Wenbo Zhao commented on SPARK-24373: I guess we should use `planWithBarrier` in the 'RelationalGroupedDataset' or other similar places. Any suggestion? > "df.cache() df.count()" no longer eagerly caches data > - > > Key: SPARK-24373 > URL: https://issues.apache.org/jira/browse/SPARK-24373 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0 >Reporter: Wenbo Zhao >Priority: Major > > Here is the code to reproduce in local mode > {code:java} > scala> val df = sc.range(1, 2).toDF > df: org.apache.spark.sql.DataFrame = [value: bigint] > scala> val myudf = udf({x: Long => println(""); x + 1}) > myudf: org.apache.spark.sql.expressions.UserDefinedFunction = > UserDefinedFunction(,LongType,Some(List(LongType))) > scala> val df1 = df.withColumn("value1", myudf(col("value"))) > df1: org.apache.spark.sql.DataFrame = [value: bigint, value1: bigint] > scala> df1.cache > res0: df1.type = [value: bigint, value1: bigint] > scala> df1.count > res1: Long = 1 > scala> df1.count > res2: Long = 1 > scala> df1.count > res3: Long = 1 > {code} > > in Spark 2.2, you could see it prints "". > In the above example, when you do explain. You could see > {code:java} > scala> df1.explain(true) > == Parsed Logical Plan == > 'Project [value#2L, UDF('value) AS value1#5] > +- AnalysisBarrier > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Analyzed Logical Plan == > value: bigint, value1: bigint > Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS value1#5L] > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Optimized Logical Plan == > InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, > memory, deserialized, 1 replicas) > +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L] > +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] > +- Scan ExternalRDDScan[obj#1L] > == Physical Plan == > *(1) InMemoryTableScan [value#2L, value1#5L] > +- InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, > memory, deserialized, 1 replicas) > +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L] > +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] > +- Scan ExternalRDDScan[obj#1L] > {code} > but the ImMemoryTableScan is mising in the following explain() > {code:java} > scala> df1.groupBy().count().explain(true) > == Parsed Logical Plan == > Aggregate [count(1) AS count#170L] > +- Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS > value1#5L] > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Analyzed Logical Plan == > count: bigint > Aggregate [count(1) AS count#170L] > +- Project [value#2L, if (isnull(value#2L)) null else if (isnull(value#2L)) > null else UDF(value#2L) AS value1#5L] > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Optimized Logical Plan == > Aggregate [count(1) AS count#170L] > +- Project > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Physical Plan == > *(2) HashAggregate(keys=[], functions=[count(1)], output=[count#170L]) > +- Exchange SinglePartition > +- *(1) HashAggregate(keys=[], functions=[partial_count(1)], > output=[count#175L]) > +- *(1) Project > +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] > +- Scan ExternalRDDScan[obj#1L] > {code} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24373) "df.cache() df.count()" no longer eagerly caches data
[ https://issues.apache.org/jira/browse/SPARK-24373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16489534#comment-16489534 ] Wenbo Zhao commented on SPARK-24373: It is not apparently to me that they are the same issue though > "df.cache() df.count()" no longer eagerly caches data > - > > Key: SPARK-24373 > URL: https://issues.apache.org/jira/browse/SPARK-24373 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0 >Reporter: Wenbo Zhao >Priority: Major > > Here is the code to reproduce in local mode > {code:java} > scala> val df = sc.range(1, 2).toDF > df: org.apache.spark.sql.DataFrame = [value: bigint] > scala> val myudf = udf({x: Long => println(""); x + 1}) > myudf: org.apache.spark.sql.expressions.UserDefinedFunction = > UserDefinedFunction(,LongType,Some(List(LongType))) > scala> val df1 = df.withColumn("value1", myudf(col("value"))) > df1: org.apache.spark.sql.DataFrame = [value: bigint, value1: bigint] > scala> df1.cache > res0: df1.type = [value: bigint, value1: bigint] > scala> df1.count > res1: Long = 1 > scala> df1.count > res2: Long = 1 > scala> df1.count > res3: Long = 1 > {code} > > in Spark 2.2, you could see it prints "". > In the above example, when you do explain. You could see > {code:java} > scala> df1.explain(true) > == Parsed Logical Plan == > 'Project [value#2L, UDF('value) AS value1#5] > +- AnalysisBarrier > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Analyzed Logical Plan == > value: bigint, value1: bigint > Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS value1#5L] > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Optimized Logical Plan == > InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, > memory, deserialized, 1 replicas) > +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L] > +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] > +- Scan ExternalRDDScan[obj#1L] > == Physical Plan == > *(1) InMemoryTableScan [value#2L, value1#5L] > +- InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, > memory, deserialized, 1 replicas) > +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L] > +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] > +- Scan ExternalRDDScan[obj#1L] > {code} > but the ImMemoryTableScan is mising in the following explain() > {code:java} > scala> df1.groupBy().count().explain(true) > == Parsed Logical Plan == > Aggregate [count(1) AS count#170L] > +- Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS > value1#5L] > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Analyzed Logical Plan == > count: bigint > Aggregate [count(1) AS count#170L] > +- Project [value#2L, if (isnull(value#2L)) null else if (isnull(value#2L)) > null else UDF(value#2L) AS value1#5L] > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Optimized Logical Plan == > Aggregate [count(1) AS count#170L] > +- Project > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Physical Plan == > *(2) HashAggregate(keys=[], functions=[count(1)], output=[count#170L]) > +- Exchange SinglePartition > +- *(1) HashAggregate(keys=[], functions=[partial_count(1)], > output=[count#175L]) > +- *(1) Project > +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] > +- Scan ExternalRDDScan[obj#1L] > {code} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-24373) "df.cache() df.count()" no longer eagerly caches data
[ https://issues.apache.org/jira/browse/SPARK-24373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16489233#comment-16489233 ] Wenbo Zhao edited comment on SPARK-24373 at 5/24/18 3:37 PM: - I turned on the log trace of RuleExecutor and found that in my example of df1.count() after cache. {code:java} scala> df1.groupBy().count().explain(true) === Applying Rule org.apache.spark.sql.catalyst.analysis.Analyzer$HandleNullInputsForUDF === Aggregate [count(1) AS count#40L] Aggregate [count(1) AS count#40L] !+- Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS value1#5L] +- Project [value#2L, if (isnull(value#2L)) null else if (isnull(value#2L)) null else UDF(value#2L) AS value1#5L] +- SerializeFromObject [input[0, bigint, false] AS value#2L] +- SerializeFromObject [input[0, bigint, false] AS value#2L] +- ExternalRDD [obj#1L] +- ExternalRDD [obj#1L] {code} that is node {code:java} Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS value1#5L]{code} becomes {code:java} Project [value#2L, if (isnull(value#2L)) null else if (isnull(value#2L)) null else UDF(value#2L) AS value1#5L]{code} This will cause a miss in the CacheManager? which could be confirmed by later applying ColumnPrunning rule's log trace. May question is: is that supposed protected by AnalysisBarrier ? was (Author: wbzhao): I turned on the log trace of RuleExecutor and found that in my example of df1.count() after cache. {code:java} scala> df1.groupBy().count().explain(true) === Applying Rule org.apache.spark.sql.catalyst.analysis.Analyzer$HandleNullInputsForUDF === Aggregate [count(1) AS count#40L] Aggregate [count(1) AS count#40L] !+- Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS value1#5L] +- Project [value#2L, if (isnull(value#2L)) null else if (isnull(value#2L)) null else UDF(value#2L) AS value1#5L] +- SerializeFromObject [input[0, bigint, false] AS value#2L] +- SerializeFromObject [input[0, bigint, false] AS value#2L] +- ExternalRDD [obj#1L] +- ExternalRDD [obj#1L] {code} that is node {code:java} Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS value1#5L]{code} becomes {code:java} Project [value#2L, if (isnull(value#2L)) null else if (isnull(value#2L)) null else UDF(value#2L) AS value1#5L]{code} This will cause a miss in the CacheManager? which could be confirmed by later applying ColumnPrunning rule's log trace. May question is: is that supposed protected by AnalysisBarrier ? > "df.cache() df.count()" no longer eagerly caches data > - > > Key: SPARK-24373 > URL: https://issues.apache.org/jira/browse/SPARK-24373 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0 >Reporter: Wenbo Zhao >Priority: Major > > Here is the code to reproduce in local mode > {code:java} > scala> val df = sc.range(1, 2).toDF > df: org.apache.spark.sql.DataFrame = [value: bigint] > scala> val myudf = udf({x: Long => println(""); x + 1}) > myudf: org.apache.spark.sql.expressions.UserDefinedFunction = > UserDefinedFunction(,LongType,Some(List(LongType))) > scala> val df1 = df.withColumn("value1", myudf(col("value"))) > df1: org.apache.spark.sql.DataFrame = [value: bigint, value1: bigint] > scala> df1.cache > res0: df1.type = [value: bigint, value1: bigint] > scala> df1.count > res1: Long = 1 > scala> df1.count > res2: Long = 1 > scala> df1.count > res3: Long = 1 > {code} > > in Spark 2.2, you could see it prints "". > In the above example, when you do explain. You could see > {code:java} > scala> df1.explain(true) > == Parsed Logical Plan == > 'Project [value#2L, UDF('value) AS value1#5] > +- AnalysisBarrier > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Analyzed Logical Plan == > value: bigint, value1: bigint > Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS value1#5L] > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Optimized Logical Plan == > InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, > memory, deserialized, 1 replicas) > +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L] > +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] > +- Scan ExternalRDDScan[obj#1L] > == Physical Plan == > *(1) InMemoryTableScan [value#2L, value1#5L] > +- InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, > memory, deserialized, 1 replicas) > +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L] > +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] > +- Scan ExternalRDDScan[obj#1L] > {code} > but the ImMemoryTableScan is mising in the following explain() > {code:java} > scala> df1.gr
[jira] [Commented] (SPARK-24373) "df.cache() df.count()" no longer eagerly caches data
[ https://issues.apache.org/jira/browse/SPARK-24373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16489233#comment-16489233 ] Wenbo Zhao commented on SPARK-24373: I turned on the log trace of RuleExecutor and found that in my example of df1.count() after cache. {code:java} scala> df1.groupBy().count().explain(true) === Applying Rule org.apache.spark.sql.catalyst.analysis.Analyzer$HandleNullInputsForUDF === Aggregate [count(1) AS count#40L] Aggregate [count(1) AS count#40L] !+- Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS value1#5L] +- Project [value#2L, if (isnull(value#2L)) null else if (isnull(value#2L)) null else UDF(value#2L) AS value1#5L] +- SerializeFromObject [input[0, bigint, false] AS value#2L] +- SerializeFromObject [input[0, bigint, false] AS value#2L] +- ExternalRDD [obj#1L] +- ExternalRDD [obj#1L] {code} that is node {code:java} Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS value1#5L]{code} becomes {code:java} Project [value#2L, if (isnull(value#2L)) null else if (isnull(value#2L)) null else UDF(value#2L) AS value1#5L]{code} This will cause a miss in the CacheManager? which could be confirmed by later applying ColumnPrunning rule's log trace. May question is: is that supposed protected by AnalysisBarrier ? > "df.cache() df.count()" no longer eagerly caches data > - > > Key: SPARK-24373 > URL: https://issues.apache.org/jira/browse/SPARK-24373 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0 >Reporter: Wenbo Zhao >Priority: Major > > Here is the code to reproduce in local mode > {code:java} > scala> val df = sc.range(1, 2).toDF > df: org.apache.spark.sql.DataFrame = [value: bigint] > scala> val myudf = udf({x: Long => println(""); x + 1}) > myudf: org.apache.spark.sql.expressions.UserDefinedFunction = > UserDefinedFunction(,LongType,Some(List(LongType))) > scala> val df1 = df.withColumn("value1", myudf(col("value"))) > df1: org.apache.spark.sql.DataFrame = [value: bigint, value1: bigint] > scala> df1.cache > res0: df1.type = [value: bigint, value1: bigint] > scala> df1.count > res1: Long = 1 > scala> df1.count > res2: Long = 1 > scala> df1.count > res3: Long = 1 > {code} > > in Spark 2.2, you could see it prints "". > In the above example, when you do explain. You could see > {code:java} > scala> df1.explain(true) > == Parsed Logical Plan == > 'Project [value#2L, UDF('value) AS value1#5] > +- AnalysisBarrier > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Analyzed Logical Plan == > value: bigint, value1: bigint > Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS value1#5L] > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Optimized Logical Plan == > InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, > memory, deserialized, 1 replicas) > +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L] > +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] > +- Scan ExternalRDDScan[obj#1L] > == Physical Plan == > *(1) InMemoryTableScan [value#2L, value1#5L] > +- InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, > memory, deserialized, 1 replicas) > +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L] > +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] > +- Scan ExternalRDDScan[obj#1L] > {code} > but the ImMemoryTableScan is mising in the following explain() > {code:java} > scala> df1.groupBy().count().explain(true) > == Parsed Logical Plan == > Aggregate [count(1) AS count#170L] > +- Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS > value1#5L] > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Analyzed Logical Plan == > count: bigint > Aggregate [count(1) AS count#170L] > +- Project [value#2L, if (isnull(value#2L)) null else if (isnull(value#2L)) > null else UDF(value#2L) AS value1#5L] > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Optimized Logical Plan == > Aggregate [count(1) AS count#170L] > +- Project > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Physical Plan == > *(2) HashAggregate(keys=[], functions=[count(1)], output=[count#170L]) > +- Exchange SinglePartition > +- *(1) HashAggregate(keys=[], functions=[partial_count(1)], > output=[count#175L]) > +- *(1) Project > +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] > +- Scan ExternalRDDScan[obj#1L] > {code} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To
[jira] [Updated] (SPARK-24373) Spark Dataset groupby.agg/count doesn't respect cache with UDF
[ https://issues.apache.org/jira/browse/SPARK-24373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenbo Zhao updated SPARK-24373: --- Description: Here is the code to reproduce in local mode {code:java} scala> val df = sc.range(1, 2).toDF df: org.apache.spark.sql.DataFrame = [value: bigint] scala> val myudf = udf({x: Long => println(""); x + 1}) myudf: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(,LongType,Some(List(LongType))) scala> val df1 = df.withColumn("value1", myudf(col("value"))) df1: org.apache.spark.sql.DataFrame = [value: bigint, value1: bigint] scala> df1.cache res0: df1.type = [value: bigint, value1: bigint] scala> df1.count res1: Long = 1 scala> df1.count res2: Long = 1 scala> df1.count res3: Long = 1 {code} in Spark 2.2, you could see it prints "". In the above example, when you do explain. You could see {code:java} scala> df1.explain(true) == Parsed Logical Plan == 'Project [value#2L, UDF('value) AS value1#5] +- AnalysisBarrier +- SerializeFromObject [input[0, bigint, false] AS value#2L] +- ExternalRDD [obj#1L] == Analyzed Logical Plan == value: bigint, value1: bigint Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS value1#5L] +- SerializeFromObject [input[0, bigint, false] AS value#2L] +- ExternalRDD [obj#1L] == Optimized Logical Plan == InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, memory, deserialized, 1 replicas) +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L] +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] +- Scan ExternalRDDScan[obj#1L] == Physical Plan == *(1) InMemoryTableScan [value#2L, value1#5L] +- InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, memory, deserialized, 1 replicas) +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L] +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] +- Scan ExternalRDDScan[obj#1L] {code} but the ImMemoryTableScan is mising in the following explain() {code:java} scala> df1.groupBy().count().explain(true) == Parsed Logical Plan == Aggregate [count(1) AS count#170L] +- Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS value1#5L] +- SerializeFromObject [input[0, bigint, false] AS value#2L] +- ExternalRDD [obj#1L] == Analyzed Logical Plan == count: bigint Aggregate [count(1) AS count#170L] +- Project [value#2L, if (isnull(value#2L)) null else if (isnull(value#2L)) null else UDF(value#2L) AS value1#5L] +- SerializeFromObject [input[0, bigint, false] AS value#2L] +- ExternalRDD [obj#1L] == Optimized Logical Plan == Aggregate [count(1) AS count#170L] +- Project +- SerializeFromObject [input[0, bigint, false] AS value#2L] +- ExternalRDD [obj#1L] == Physical Plan == *(2) HashAggregate(keys=[], functions=[count(1)], output=[count#170L]) +- Exchange SinglePartition +- *(1) HashAggregate(keys=[], functions=[partial_count(1)], output=[count#175L]) +- *(1) Project +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] +- Scan ExternalRDDScan[obj#1L] {code} was: Here is the code to reproduce in local mode {code:java} scala> val df = sc.range(1, 2).toDF df: org.apache.spark.sql.DataFrame = [value: bigint] scala> val myudf = udf({x: Long => println(""); x + 1}) scala> val df1 = df.withColumn("value1", myudf(col("value"))) scala> df1.cache() res0: df1.type = [value: bigint, value1: bigint] scala> df1.count res1: Long = 1 scala> df1.count res2: Long = 1 scala> df1.count res3: Long = 1 {code} in Spark 2.2, you could see it prints "". In the above example, when you do explain. You could see {code:java} scala> df1.explain(true) == Parsed Logical Plan == 'Project [value#2L, UDF('value) AS value1#5] +- AnalysisBarrier +- SerializeFromObject [input[0, bigint, false] AS value#2L] +- ExternalRDD [obj#1L] == Analyzed Logical Plan == value: bigint, value1: bigint Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS value1#5L] +- SerializeFromObject [input[0, bigint, false] AS value#2L] +- ExternalRDD [obj#1L] == Optimized Logical Plan == InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, memory, deserialized, 1 replicas) +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L] +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] +- Scan ExternalRDDScan[obj#1L] == Physical Plan == *(1) InMemoryTableScan [value#2L, value1#5L] +- InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, memory, deserialized, 1 replicas) +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L] +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] +- Scan ExternalRDDScan[obj#1L] {code} but the ImMemoryTableScan is mising in the following explain() {code:java} scala> df1.groupBy().count().explain(true) == Parsed Logical Plan == Aggregate [count(1) AS count#170L] +- Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS value
[jira] [Updated] (SPARK-24373) Spark Dataset groupby.agg/count doesn't respect cache with UDF
[ https://issues.apache.org/jira/browse/SPARK-24373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenbo Zhao updated SPARK-24373: --- Description: Here is the code to reproduce in local mode {code:java} scala> val df = sc.range(1, 2).toDF df: org.apache.spark.sql.DataFrame = [value: bigint] scala> val myudf = udf({x: Long => println(""); x + 1}) scala> val df1 = df.withColumn("value1", myudf(col("value"))) scala> df1.cache() res0: df1.type = [value: bigint, value1: bigint] scala> df1.count res1: Long = 1 scala> df1.count res2: Long = 1 scala> df1.count res3: Long = 1 {code} in Spark 2.2, you could see it prints "". In the above example, when you do explain. You could see {code:java} scala> df1.explain(true) == Parsed Logical Plan == 'Project [value#2L, UDF('value) AS value1#5] +- AnalysisBarrier +- SerializeFromObject [input[0, bigint, false] AS value#2L] +- ExternalRDD [obj#1L] == Analyzed Logical Plan == value: bigint, value1: bigint Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS value1#5L] +- SerializeFromObject [input[0, bigint, false] AS value#2L] +- ExternalRDD [obj#1L] == Optimized Logical Plan == InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, memory, deserialized, 1 replicas) +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L] +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] +- Scan ExternalRDDScan[obj#1L] == Physical Plan == *(1) InMemoryTableScan [value#2L, value1#5L] +- InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, memory, deserialized, 1 replicas) +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L] +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] +- Scan ExternalRDDScan[obj#1L] {code} but the ImMemoryTableScan is mising in the following explain() {code:java} scala> df1.groupBy().count().explain(true) == Parsed Logical Plan == Aggregate [count(1) AS count#170L] +- Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS value1#5L] +- SerializeFromObject [input[0, bigint, false] AS value#2L] +- ExternalRDD [obj#1L] == Analyzed Logical Plan == count: bigint Aggregate [count(1) AS count#170L] +- Project [value#2L, if (isnull(value#2L)) null else if (isnull(value#2L)) null else UDF(value#2L) AS value1#5L] +- SerializeFromObject [input[0, bigint, false] AS value#2L] +- ExternalRDD [obj#1L] == Optimized Logical Plan == Aggregate [count(1) AS count#170L] +- Project +- SerializeFromObject [input[0, bigint, false] AS value#2L] +- ExternalRDD [obj#1L] == Physical Plan == *(2) HashAggregate(keys=[], functions=[count(1)], output=[count#170L]) +- Exchange SinglePartition +- *(1) HashAggregate(keys=[], functions=[partial_count(1)], output=[count#175L]) +- *(1) Project +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] +- Scan ExternalRDDScan[obj#1L] {code} was: Here is the code to reproduce in local mode {code:java} val df = sc.range(1, 2).toDF val myudf = udf({x: Long => println(""); x + 1}) scala> df1.cache() res0: df1.type = [value: bigint, value1: bigint] scala> df1.count res1: Long = 1 scala> df1.count res2: Long = 1 scala> df1.count res3: Long = 1 {code} in Spark 2.2, you could see it prints "". In the above example, when you do explain. You could see {code:java} scala> df1.explain(true) == Parsed Logical Plan == 'Project [value#2L, UDF('value) AS value1#5] +- AnalysisBarrier +- SerializeFromObject [input[0, bigint, false] AS value#2L] +- ExternalRDD [obj#1L] == Analyzed Logical Plan == value: bigint, value1: bigint Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS value1#5L] +- SerializeFromObject [input[0, bigint, false] AS value#2L] +- ExternalRDD [obj#1L] == Optimized Logical Plan == InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, memory, deserialized, 1 replicas) +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L] +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] +- Scan ExternalRDDScan[obj#1L] == Physical Plan == *(1) InMemoryTableScan [value#2L, value1#5L] +- InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, memory, deserialized, 1 replicas) +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L] +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] +- Scan ExternalRDDScan[obj#1L] {code} but the ImMemoryTableScan is mising in the following explain() {code:java} scala> df1.groupBy().count().explain(true) == Parsed Logical Plan == Aggregate [count(1) AS count#170L] +- Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS value1#5L] +- SerializeFromObject [input[0, bigint, false] AS value#2L] +- ExternalRDD [obj#1L] == Analyzed Logical Plan == count: bigint Aggregate [count(1) AS count#170L] +- Project [value#2L, if (isnull(value#2L)) null else if (isnull(value#2L)) null else UDF(value#2L) AS value1#5L] +- SerializeFromObject [input[0,
[jira] [Updated] (SPARK-24373) Spark Dataset groupby.agg/count doesn't respect cache with UDF
[ https://issues.apache.org/jira/browse/SPARK-24373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenbo Zhao updated SPARK-24373: --- Summary: Spark Dataset groupby.agg/count doesn't respect cache with UDF (was: Spark Dataset groupby.agg/count doesn't respect cache) > Spark Dataset groupby.agg/count doesn't respect cache with UDF > -- > > Key: SPARK-24373 > URL: https://issues.apache.org/jira/browse/SPARK-24373 > Project: Spark > Issue Type: Bug > Components: Input/Output >Affects Versions: 2.3.0 >Reporter: Wenbo Zhao >Priority: Major > > Here is the code to reproduce in local mode > {code:java} > val df = sc.range(1, 2).toDF > val myudf = udf({x: Long => println(""); x + 1}) > scala> df1.cache() > res0: df1.type = [value: bigint, value1: bigint] > scala> df1.count > res1: Long = 1 > scala> df1.count > res2: Long = 1 > scala> df1.count > res3: Long = 1 > {code} > > in Spark 2.2, you could see it prints "". > In the above example, when you do explain. You could see > {code:java} > scala> df1.explain(true) > == Parsed Logical Plan == > 'Project [value#2L, UDF('value) AS value1#5] > +- AnalysisBarrier > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Analyzed Logical Plan == > value: bigint, value1: bigint > Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS value1#5L] > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Optimized Logical Plan == > InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, > memory, deserialized, 1 replicas) > +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L] > +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] > +- Scan ExternalRDDScan[obj#1L] > == Physical Plan == > *(1) InMemoryTableScan [value#2L, value1#5L] > +- InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, > memory, deserialized, 1 replicas) > +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L] > +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] > +- Scan ExternalRDDScan[obj#1L] > {code} > but the ImMemoryTableScan is mising in the following explain() > {code:java} > scala> df1.groupBy().count().explain(true) > == Parsed Logical Plan == > Aggregate [count(1) AS count#170L] > +- Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS > value1#5L] > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Analyzed Logical Plan == > count: bigint > Aggregate [count(1) AS count#170L] > +- Project [value#2L, if (isnull(value#2L)) null else if (isnull(value#2L)) > null else UDF(value#2L) AS value1#5L] > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Optimized Logical Plan == > Aggregate [count(1) AS count#170L] > +- Project > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Physical Plan == > *(2) HashAggregate(keys=[], functions=[count(1)], output=[count#170L]) > +- Exchange SinglePartition > +- *(1) HashAggregate(keys=[], functions=[partial_count(1)], > output=[count#175L]) > +- *(1) Project > +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] > +- Scan ExternalRDDScan[obj#1L] > {code} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-24373) Spark Dataset groupby.agg/count doesn't respect cache
Wenbo Zhao created SPARK-24373: -- Summary: Spark Dataset groupby.agg/count doesn't respect cache Key: SPARK-24373 URL: https://issues.apache.org/jira/browse/SPARK-24373 Project: Spark Issue Type: Bug Components: Input/Output Affects Versions: 2.3.0 Reporter: Wenbo Zhao Here is the code to reproduce in local mode {code:java} val df = sc.range(1, 2).toDF val myudf = udf({x: Long => println(""); x + 1}) scala> df1.cache() res0: df1.type = [value: bigint, value1: bigint] scala> df1.count res1: Long = 1 scala> df1.count res2: Long = 1 scala> df1.count res3: Long = 1 {code} in Spark 2.2, you could see it prints "". In the above example, when you do explain. You could see {code:java} scala> df1.explain(true) == Parsed Logical Plan == 'Project [value#2L, UDF('value) AS value1#5] +- AnalysisBarrier +- SerializeFromObject [input[0, bigint, false] AS value#2L] +- ExternalRDD [obj#1L] == Analyzed Logical Plan == value: bigint, value1: bigint Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS value1#5L] +- SerializeFromObject [input[0, bigint, false] AS value#2L] +- ExternalRDD [obj#1L] == Optimized Logical Plan == InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, memory, deserialized, 1 replicas) +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L] +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] +- Scan ExternalRDDScan[obj#1L] == Physical Plan == *(1) InMemoryTableScan [value#2L, value1#5L] +- InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, memory, deserialized, 1 replicas) +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L] +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] +- Scan ExternalRDDScan[obj#1L] {code} but the ImMemoryTableScan is mising in the following explain() {code:java} scala> df1.groupBy().count().explain(true) == Parsed Logical Plan == Aggregate [count(1) AS count#170L] +- Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS value1#5L] +- SerializeFromObject [input[0, bigint, false] AS value#2L] +- ExternalRDD [obj#1L] == Analyzed Logical Plan == count: bigint Aggregate [count(1) AS count#170L] +- Project [value#2L, if (isnull(value#2L)) null else if (isnull(value#2L)) null else UDF(value#2L) AS value1#5L] +- SerializeFromObject [input[0, bigint, false] AS value#2L] +- ExternalRDD [obj#1L] == Optimized Logical Plan == Aggregate [count(1) AS count#170L] +- Project +- SerializeFromObject [input[0, bigint, false] AS value#2L] +- ExternalRDD [obj#1L] == Physical Plan == *(2) HashAggregate(keys=[], functions=[count(1)], output=[count#170L]) +- Exchange SinglePartition +- *(1) HashAggregate(keys=[], functions=[partial_count(1)], output=[count#175L]) +- *(1) Project +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] +- Scan ExternalRDDScan[obj#1L] {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org