[jira] [Comment Edited] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue

2018-06-19 Thread Wenbo Zhao (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517532#comment-16517532
 ] 

Wenbo Zhao edited comment on SPARK-24578 at 6/19/18 8:55 PM:
-

woop, [~attilapiros], sorry, I didn't know you have created a PR. 


was (Author: wbzhao):
woop, [~attilapiros], sorry, I didn't you have created a PR. 

> Reading remote cache block behavior changes and causes timeout issue
> 
>
> Key: SPARK-24578
> URL: https://issues.apache.org/jira/browse/SPARK-24578
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0, 2.3.1
>Reporter: Wenbo Zhao
>Priority: Major
>
> After Spark 2.3, we observed lots of errors like the following in some of our 
> production job
> {code:java}
> 18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result 
> ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003, 
> chunkIndex=0}, 
> buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to 
> /172.22.18.7:60865; closing connection
> java.io.IOException: Broken pipe
> at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
> at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
> at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
> at sun.nio.ch.IOUtil.write(IOUtil.java:65)
> at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123)
> at 
> io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355)
> at 
> io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224)
> at 
> io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382)
> at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934)
> at 
> io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362)
> at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901)
> at 
> io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at 
> io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at 
> io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983)
> at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248)
> at 
> io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284)
> at 
> io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
> at 
> io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
> {code}
>  
> Here is a small reproducible for a small cluster of 2 executors (say host-1 
> and host-2) each with 8 cores. Here, the memory of driver and executors are 
> not an import factor here as long as it is big enough, say 20G. 
> {code:java}
> val n = 1
> val df0 = sc.parallelize(1 to n).toDF
> val df = df0.withColumn("x0", rand()).withColumn("x0", rand()
> ).withColumn("x1", rand()
> ).withColumn("x2", rand()
> ).withColumn("x3", rand()
> ).withColumn("x4", rand()
> ).withColumn("x5", rand()
> ).withColumn("x6", rand()
> ).withColumn("x7", rand()
> 

[jira] [Commented] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue

2018-06-19 Thread Wenbo Zhao (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517532#comment-16517532
 ] 

Wenbo Zhao commented on SPARK-24578:


woop, [~attilapiros], sorry, I didn't you have created a PR. 

> Reading remote cache block behavior changes and causes timeout issue
> 
>
> Key: SPARK-24578
> URL: https://issues.apache.org/jira/browse/SPARK-24578
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0, 2.3.1
>Reporter: Wenbo Zhao
>Priority: Major
>
> After Spark 2.3, we observed lots of errors like the following in some of our 
> production job
> {code:java}
> 18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result 
> ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003, 
> chunkIndex=0}, 
> buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to 
> /172.22.18.7:60865; closing connection
> java.io.IOException: Broken pipe
> at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
> at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
> at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
> at sun.nio.ch.IOUtil.write(IOUtil.java:65)
> at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123)
> at 
> io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355)
> at 
> io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224)
> at 
> io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382)
> at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934)
> at 
> io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362)
> at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901)
> at 
> io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at 
> io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at 
> io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983)
> at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248)
> at 
> io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284)
> at 
> io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
> at 
> io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
> {code}
>  
> Here is a small reproducible for a small cluster of 2 executors (say host-1 
> and host-2) each with 8 cores. Here, the memory of driver and executors are 
> not an import factor here as long as it is big enough, say 20G. 
> {code:java}
> val n = 1
> val df0 = sc.parallelize(1 to n).toDF
> val df = df0.withColumn("x0", rand()).withColumn("x0", rand()
> ).withColumn("x1", rand()
> ).withColumn("x2", rand()
> ).withColumn("x3", rand()
> ).withColumn("x4", rand()
> ).withColumn("x5", rand()
> ).withColumn("x6", rand()
> ).withColumn("x7", rand()
> ).withColumn("x8", rand()
> ).withColumn("x9", rand())
> df.cache; df.count
> (1 to 10).toArray.par.map { i => println(i); 
> 

[jira] [Commented] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue

2018-06-19 Thread Wenbo Zhao (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517465#comment-16517465
 ] 

Wenbo Zhao commented on SPARK-24578:


[~attilapiros] if don't mind, I could create a PR for it :)

> Reading remote cache block behavior changes and causes timeout issue
> 
>
> Key: SPARK-24578
> URL: https://issues.apache.org/jira/browse/SPARK-24578
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0, 2.3.1
>Reporter: Wenbo Zhao
>Priority: Major
>
> After Spark 2.3, we observed lots of errors like the following in some of our 
> production job
> {code:java}
> 18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result 
> ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003, 
> chunkIndex=0}, 
> buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to 
> /172.22.18.7:60865; closing connection
> java.io.IOException: Broken pipe
> at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
> at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
> at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
> at sun.nio.ch.IOUtil.write(IOUtil.java:65)
> at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123)
> at 
> io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355)
> at 
> io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224)
> at 
> io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382)
> at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934)
> at 
> io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362)
> at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901)
> at 
> io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at 
> io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at 
> io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983)
> at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248)
> at 
> io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284)
> at 
> io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
> at 
> io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
> {code}
>  
> Here is a small reproducible for a small cluster of 2 executors (say host-1 
> and host-2) each with 8 cores. Here, the memory of driver and executors are 
> not an import factor here as long as it is big enough, say 20G. 
> {code:java}
> val n = 1
> val df0 = sc.parallelize(1 to n).toDF
> val df = df0.withColumn("x0", rand()).withColumn("x0", rand()
> ).withColumn("x1", rand()
> ).withColumn("x2", rand()
> ).withColumn("x3", rand()
> ).withColumn("x4", rand()
> ).withColumn("x5", rand()
> ).withColumn("x6", rand()
> ).withColumn("x7", rand()
> ).withColumn("x8", rand()
> ).withColumn("x9", rand())
> df.cache; df.count
> (1 to 10).toArray.par.map { i => println(i); 
> 

[jira] [Commented] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue

2018-06-19 Thread Wenbo Zhao (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517458#comment-16517458
 ] 

Wenbo Zhao commented on SPARK-24578:


Hi [~vanzin].  the commit 
[https://github.com/apache/spark/commit/16612638f0539f197eb7deb1be2ec53fed60d707#diff-a1f697fbd4610dff4eb8ff848e8cea2a]
 is only included  since 2.3.0.  If I understand right, the root cause is that 
we don't do `consolidateIfNeeded` anymore for many small chunks which causes 
the buf.notBuffer() has bad performance in the case that we have to call 
`copyByteBuf()` many times. In my example, the many times = 500MB / 256 KB.

[~attilapiros] I applied 
{code:java}
ByteBuffer buffer = buf.nioBuffer(buf.readerIndex(), 
Math.min(buf.readableBytes(), NIO_BUFFER_LIMIT));{code}
in our spark distribution and ran through the test code in this jira. This 
works fine. 

 

> Reading remote cache block behavior changes and causes timeout issue
> 
>
> Key: SPARK-24578
> URL: https://issues.apache.org/jira/browse/SPARK-24578
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0, 2.3.1
>Reporter: Wenbo Zhao
>Priority: Major
>
> After Spark 2.3, we observed lots of errors like the following in some of our 
> production job
> {code:java}
> 18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result 
> ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003, 
> chunkIndex=0}, 
> buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to 
> /172.22.18.7:60865; closing connection
> java.io.IOException: Broken pipe
> at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
> at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
> at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
> at sun.nio.ch.IOUtil.write(IOUtil.java:65)
> at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123)
> at 
> io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355)
> at 
> io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224)
> at 
> io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382)
> at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934)
> at 
> io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362)
> at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901)
> at 
> io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at 
> io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at 
> io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983)
> at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248)
> at 
> io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284)
> at 
> io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
> at 
> io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
> {code}
>  
> Here is a small reproducible for a small cluster of 2 executors 

[jira] [Commented] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue

2018-06-19 Thread Wenbo Zhao (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517441#comment-16517441
 ] 

Wenbo Zhao commented on SPARK-24578:


[~irashid] Yes, that is exactly what I saw in our side.

> Reading remote cache block behavior changes and causes timeout issue
> 
>
> Key: SPARK-24578
> URL: https://issues.apache.org/jira/browse/SPARK-24578
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0, 2.3.1
>Reporter: Wenbo Zhao
>Priority: Major
>
> After Spark 2.3, we observed lots of errors like the following in some of our 
> production job
> {code:java}
> 18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result 
> ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003, 
> chunkIndex=0}, 
> buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to 
> /172.22.18.7:60865; closing connection
> java.io.IOException: Broken pipe
> at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
> at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
> at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
> at sun.nio.ch.IOUtil.write(IOUtil.java:65)
> at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123)
> at 
> io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355)
> at 
> io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224)
> at 
> io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382)
> at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934)
> at 
> io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362)
> at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901)
> at 
> io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at 
> io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at 
> io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983)
> at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248)
> at 
> io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284)
> at 
> io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
> at 
> io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
> {code}
>  
> Here is a small reproducible for a small cluster of 2 executors (say host-1 
> and host-2) each with 8 cores. Here, the memory of driver and executors are 
> not an import factor here as long as it is big enough, say 20G. 
> {code:java}
> val n = 1
> val df0 = sc.parallelize(1 to n).toDF
> val df = df0.withColumn("x0", rand()).withColumn("x0", rand()
> ).withColumn("x1", rand()
> ).withColumn("x2", rand()
> ).withColumn("x3", rand()
> ).withColumn("x4", rand()
> ).withColumn("x5", rand()
> ).withColumn("x6", rand()
> ).withColumn("x7", rand()
> ).withColumn("x8", rand()
> ).withColumn("x9", rand())
> df.cache; df.count
> (1 to 10).toArray.par.map { i => println(i); 
> 

[jira] [Commented] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue

2018-06-19 Thread Wenbo Zhao (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517440#comment-16517440
 ] 

Wenbo Zhao commented on SPARK-24578:


Hi [~attilapiros], I guess what you suggest is 
{code:java}
ByteBuffer buffer = buf.nioBuffer(buf.readerIndex(), 
Math.min(buf.readableBytes(), NIO_BUFFER_LIMIT));{code}
Will report back how my test goes.

> Reading remote cache block behavior changes and causes timeout issue
> 
>
> Key: SPARK-24578
> URL: https://issues.apache.org/jira/browse/SPARK-24578
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0, 2.3.1
>Reporter: Wenbo Zhao
>Priority: Major
>
> After Spark 2.3, we observed lots of errors like the following in some of our 
> production job
> {code:java}
> 18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result 
> ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003, 
> chunkIndex=0}, 
> buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to 
> /172.22.18.7:60865; closing connection
> java.io.IOException: Broken pipe
> at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
> at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
> at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
> at sun.nio.ch.IOUtil.write(IOUtil.java:65)
> at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123)
> at 
> io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355)
> at 
> io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224)
> at 
> io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382)
> at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934)
> at 
> io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362)
> at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901)
> at 
> io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at 
> io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at 
> io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983)
> at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248)
> at 
> io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284)
> at 
> io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
> at 
> io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
> {code}
>  
> Here is a small reproducible for a small cluster of 2 executors (say host-1 
> and host-2) each with 8 cores. Here, the memory of driver and executors are 
> not an import factor here as long as it is big enough, say 20G. 
> {code:java}
> val n = 1
> val df0 = sc.parallelize(1 to n).toDF
> val df = df0.withColumn("x0", rand()).withColumn("x0", rand()
> ).withColumn("x1", rand()
> ).withColumn("x2", rand()
> ).withColumn("x3", rand()
> ).withColumn("x4", rand()
> ).withColumn("x5", rand()
> ).withColumn("x6", rand()
> ).withColumn("x7", 

[jira] [Commented] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue

2018-06-19 Thread Wenbo Zhao (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517180#comment-16517180
 ] 

Wenbo Zhao commented on SPARK-24578:


After digging more details, this commit  
[https://github.com/apache/spark/commit/16612638f0539f197eb7deb1be2ec53fed60d707#diff-a1f697fbd4610dff4eb8ff848e8cea2a]
 came to my attention. I reverted this commit in our Spark distribution and 
found out the issue went away.

I also examined the log to verify that performance of 
[https://github.com/apache/spark/blob/v2.3.0/common/network-common/src/main/java/org/apache/spark/network/protocol/MessageWithHeader.java#L142]
 is back to normal. It is able to transfer the 500MB data in 2s~3s. 

However, it is still not quite clear why that commit is causing the issue.

> Reading remote cache block behavior changes and causes timeout issue
> 
>
> Key: SPARK-24578
> URL: https://issues.apache.org/jira/browse/SPARK-24578
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0, 2.3.1
>Reporter: Wenbo Zhao
>Priority: Major
>
> After Spark 2.3, we observed lots of errors like the following in some of our 
> production job
> {code:java}
> 18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result 
> ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003, 
> chunkIndex=0}, 
> buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to 
> /172.22.18.7:60865; closing connection
> java.io.IOException: Broken pipe
> at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
> at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
> at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
> at sun.nio.ch.IOUtil.write(IOUtil.java:65)
> at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123)
> at 
> io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355)
> at 
> io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224)
> at 
> io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382)
> at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934)
> at 
> io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362)
> at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901)
> at 
> io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at 
> io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at 
> io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983)
> at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248)
> at 
> io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284)
> at 
> io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
> at 
> io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
> {code}
>  
> Here is a small reproducible for a small cluster of 2 executors (say host-1 
> and host-2) each with 8 cores. Here, the memory of driver and 

[jira] [Commented] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue

2018-06-18 Thread Wenbo Zhao (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16516621#comment-16516621
 ] 

Wenbo Zhao commented on SPARK-24578:


Hi [~irashid], many thanks for clarifying my questions. I tried to compare the 
logs between Spark 2.2.1 and 2.3.0 to see if there something interesting but 
out of luck so far. I added a few logging to benchmark how much time we need to 
transfer ~500MB data in the 
[https://github.com/apache/spark/blob/v2.3.0/common/network-common/src/main/java/org/apache/spark/network/protocol/MessageWithHeader.java#L142].
 In Spark 2.2.1, it only took <= 2s, but in Spark 2.3.0, it is very slow and 
never got finish with 120s and thus timeout. 

Are you able to reproduce the issue?

> Reading remote cache block behavior changes and causes timeout issue
> 
>
> Key: SPARK-24578
> URL: https://issues.apache.org/jira/browse/SPARK-24578
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0, 2.3.1
>Reporter: Wenbo Zhao
>Priority: Major
>
> After Spark 2.3, we observed lots of errors like the following in some of our 
> production job
> {code:java}
> 18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result 
> ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003, 
> chunkIndex=0}, 
> buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to 
> /172.22.18.7:60865; closing connection
> java.io.IOException: Broken pipe
> at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
> at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
> at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
> at sun.nio.ch.IOUtil.write(IOUtil.java:65)
> at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123)
> at 
> io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355)
> at 
> io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224)
> at 
> io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382)
> at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934)
> at 
> io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362)
> at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901)
> at 
> io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at 
> io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at 
> io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983)
> at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248)
> at 
> io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284)
> at 
> io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
> at 
> io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
> {code}
>  
> Here is a small reproducible for a small cluster of 2 executors (say host-1 
> and host-2) each with 8 cores. Here, the memory of driver and executors are 
> not an import factor here 

[jira] [Commented] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue

2018-06-18 Thread Wenbo Zhao (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16516609#comment-16516609
 ] 

Wenbo Zhao commented on SPARK-24578:


For now, we could reproduce this issue in completely different env and 
different distribution of Spark 2.3.0. We also observed that by setting 
spark.locality.wait to be a big number, e.g. 60s could help but this reduces 
the our system throughput significantly.

> Reading remote cache block behavior changes and causes timeout issue
> 
>
> Key: SPARK-24578
> URL: https://issues.apache.org/jira/browse/SPARK-24578
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0, 2.3.1
>Reporter: Wenbo Zhao
>Priority: Major
>
> After Spark 2.3, we observed lots of errors like the following in some of our 
> production job
> {code:java}
> 18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result 
> ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003, 
> chunkIndex=0}, 
> buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to 
> /172.22.18.7:60865; closing connection
> java.io.IOException: Broken pipe
> at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
> at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
> at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
> at sun.nio.ch.IOUtil.write(IOUtil.java:65)
> at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123)
> at 
> io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355)
> at 
> io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224)
> at 
> io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382)
> at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934)
> at 
> io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362)
> at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901)
> at 
> io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at 
> io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at 
> io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983)
> at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248)
> at 
> io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284)
> at 
> io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
> at 
> io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
> {code}
>  
> Here is a small reproducible for a small cluster of 2 executors (say host-1 
> and host-2) each with 8 cores. Here, the memory of driver and executors are 
> not an import factor here as long as it is big enough, say 20G. 
> {code:java}
> val n = 1
> val df0 = sc.parallelize(1 to n).toDF
> val df = df0.withColumn("x0", rand()).withColumn("x0", rand()
> ).withColumn("x1", rand()
> ).withColumn("x2", rand()
> ).withColumn("x3", rand()
> ).withColumn("x4", rand()
> ).withColumn("x5", 

[jira] [Updated] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue

2018-06-18 Thread Wenbo Zhao (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenbo Zhao updated SPARK-24578:
---
Description: 
After Spark 2.3, we observed lots of errors like the following in some of our 
production job
{code:java}
18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result 
ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003, 
chunkIndex=0}, 
buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to 
/172.22.18.7:60865; closing connection
java.io.IOException: Broken pipe
at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
at sun.nio.ch.IOUtil.write(IOUtil.java:65)
at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
at 
org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156)
at 
org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142)
at 
org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123)
at 
io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355)
at 
io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224)
at 
io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382)
at 
io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934)
at 
io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362)
at 
io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901)
at 
io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
at 
io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
at 
io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
at 
io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
at 
io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
at 
io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983)
at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248)
at 
io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284)
at 
io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
at 
io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
at 
io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
at 
io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
{code}
 

Here is a small reproducible for a small cluster of 2 executors (say host-1 and 
host-2) each with 8 cores. Here, the memory of driver and executors are not an 
import factor here as long as it is big enough, say 20G. 
{code:java}
val n = 1
val df0 = sc.parallelize(1 to n).toDF
val df = df0.withColumn("x0", rand()).withColumn("x0", rand()
).withColumn("x1", rand()
).withColumn("x2", rand()
).withColumn("x3", rand()
).withColumn("x4", rand()
).withColumn("x5", rand()
).withColumn("x6", rand()
).withColumn("x7", rand()
).withColumn("x8", rand()
).withColumn("x9", rand())

df.cache; df.count

(1 to 10).toArray.par.map { i => println(i); 
df.groupBy("x1").agg(count("value")).show() }
{code}
 

In the above example, we generate a random DataFrame of size around 7G; cache 
it and then perform a parallel DataFrame operations by using `array.par.map`. 
Because of the parallel computation, with high possibility, some task could be 
scheduled to a host-2 where it needs to read the cache block data from host-1. 
This follows the code path of 
[https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L691]
 and then tries to transfer a big block (~ 500MB) of cache block from host-1 to 
host-2. Often, this big transfer makes the cluster suffer time out issue (it 
will retry 3 times, each with 120s 

[jira] [Commented] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue

2018-06-18 Thread Wenbo Zhao (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16516602#comment-16516602
 ] 

Wenbo Zhao commented on SPARK-24578:


[~irashid] We didn't touch "spark.maxRemoteBlockSizeFetchToMem". You are right. 
After digging more details, I don't think that commit is relevant,

> Reading remote cache block behavior changes and causes timeout issue
> 
>
> Key: SPARK-24578
> URL: https://issues.apache.org/jira/browse/SPARK-24578
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0, 2.3.1
>Reporter: Wenbo Zhao
>Priority: Major
>
> After Spark 2.3, we observed lots of errors like the following in some of our 
> production job
> {code:java}
> 18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result 
> ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003, 
> chunkIndex=0}, 
> buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to 
> /172.22.18.7:60865; closing connection
> java.io.IOException: Broken pipe
> at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
> at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
> at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
> at sun.nio.ch.IOUtil.write(IOUtil.java:65)
> at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123)
> at 
> io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355)
> at 
> io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224)
> at 
> io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382)
> at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934)
> at 
> io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362)
> at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901)
> at 
> io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at 
> io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at 
> io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983)
> at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248)
> at 
> io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284)
> at 
> io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
> at 
> io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
> {code}
>  
> Here is a small reproducible for a small cluster of 2 executors (say host-1 
> and host-2) each with 8 cores. Here, the memory of driver and executors are 
> not an import factor here as long as it is big enough, say 20G. 
> {code:java}
> val n = 1
> val df0 = sc.parallelize(1 to n).toDF
> val df = df0.withColumn("x0", rand()).withColumn("x0", rand()
> ).withColumn("x1", rand()
> ).withColumn("x2", rand()
> ).withColumn("x3", rand()
> ).withColumn("x4", rand()
> ).withColumn("x5", rand()
> ).withColumn("x6", rand()
> ).withColumn("x7", rand()
> ).withColumn("x8", rand()
> ).withColumn("x9", 

[jira] [Updated] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue

2018-06-18 Thread Wenbo Zhao (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenbo Zhao updated SPARK-24578:
---
Description: 
After Spark 2.3, we observed lots of errors like the following in some of our 
production job
{code:java}
18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result 
ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003, 
chunkIndex=0}, 
buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to 
/172.22.18.7:60865; closing connection
java.io.IOException: Broken pipe
at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
at sun.nio.ch.IOUtil.write(IOUtil.java:65)
at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
at 
org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156)
at 
org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142)
at 
org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123)
at 
io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355)
at 
io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224)
at 
io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382)
at 
io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934)
at 
io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362)
at 
io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901)
at 
io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
at 
io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
at 
io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
at 
io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
at 
io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
at 
io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983)
at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248)
at 
io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284)
at 
io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
at 
io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
at 
io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
at 
io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
{code}
 

Here is a small reproducible for a small cluster of 2 executors (say host-1 and 
host-2) each with 8 cores. Here, the memory of driver and executors are not an 
import factor here as long as it is big enough, say 20G. 
{code:java}
val n = 1
val df0 = sc.parallelize(1 to n).toDF
val df = df0.withColumn("x0", rand()).withColumn("x0", rand()
).withColumn("x1", rand()
).withColumn("x2", rand()
).withColumn("x3", rand()
).withColumn("x4", rand()
).withColumn("x5", rand()
).withColumn("x6", rand()
).withColumn("x7", rand()
).withColumn("x8", rand()
).withColumn("x9", rand())

df.cache; df.count

(1 to 10).toArray.par.map { i => println(i); 
df.groupBy("x1").agg(count("value")).show() }
{code}
 

In the above example, we generate a random DataFrame of size around 7G; cache 
it and then perform a parallel DataFrame operations by using `array.par.map`. 
Because of the parallel computation, with high possibility, some task could be 
scheduled to a host-2 where it needs to read the cache block data from host-1. 
This follows the code path of 
[https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L691]
 and then tries to transfer a big block (~ 500MB) of cache block from host-1 to 
host-2. Often, this big transfer makes the cluster suffer time out issue (it 
will retry 3 times, each with 120s 

[jira] [Commented] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue

2018-06-18 Thread Wenbo Zhao (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16515858#comment-16515858
 ] 

Wenbo Zhao commented on SPARK-24578:


An easier reproduciable cluster setting is 10 executors each with 2 cores and 
15G memory.

> Reading remote cache block behavior changes and causes timeout issue
> 
>
> Key: SPARK-24578
> URL: https://issues.apache.org/jira/browse/SPARK-24578
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0, 2.3.1
>Reporter: Wenbo Zhao
>Priority: Major
>
> After Spark 2.3, we observed lots of errors like the following in some of our 
> production job
> {code:java}
> 18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result 
> ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003, 
> chunkIndex=0}, 
> buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to 
> /172.22.18.7:60865; closing connection
> java.io.IOException: Broken pipe
> at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
> at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
> at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
> at sun.nio.ch.IOUtil.write(IOUtil.java:65)
> at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123)
> at 
> io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355)
> at 
> io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224)
> at 
> io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382)
> at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934)
> at 
> io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362)
> at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901)
> at 
> io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at 
> io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at 
> io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983)
> at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248)
> at 
> io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284)
> at 
> io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
> at 
> io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
> {code}
>  
> Here is a small reproducible for a small cluster of 2 executors (say host-1 
> and host-2) each with 8 cores (the memory of driver and executors are not a 
> import factor here as long as it is big enough, say 20G).
> {code:java}
> val n = 1
> val df0 = sc.parallelize(1 to n).toDF
> val df = df0.withColumn("x0", rand()).withColumn("x0", rand()
> ).withColumn("x1", rand()
> ).withColumn("x2", rand()
> ).withColumn("x3", rand()
> ).withColumn("x4", rand()
> ).withColumn("x5", rand()
> ).withColumn("x6", rand()
> ).withColumn("x7", rand()
> ).withColumn("x8", rand()
> ).withColumn("x9", rand())
> df.cache; df.count
> (1 to 10).toArray.par.map { i => 

[jira] [Updated] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue

2018-06-18 Thread Wenbo Zhao (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenbo Zhao updated SPARK-24578:
---
Description: 
After Spark 2.3, we observed lots of errors like the following in some of our 
production job
{code:java}
18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result 
ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003, 
chunkIndex=0}, 
buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to 
/172.22.18.7:60865; closing connection
java.io.IOException: Broken pipe
at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
at sun.nio.ch.IOUtil.write(IOUtil.java:65)
at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
at 
org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156)
at 
org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142)
at 
org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123)
at 
io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355)
at 
io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224)
at 
io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382)
at 
io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934)
at 
io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362)
at 
io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901)
at 
io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
at 
io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
at 
io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
at 
io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
at 
io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
at 
io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983)
at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248)
at 
io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284)
at 
io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
at 
io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
at 
io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
at 
io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
{code}
 

Here is a small reproducible for a small cluster of 2 executors (say host-1 and 
host-2) each with 8 cores (the memory of driver and executors are not a import 
factor here as long as it is big enough, say 20G).
{code:java}
val n = 1
val df0 = sc.parallelize(1 to n).toDF
val df = df0.withColumn("x0", rand()).withColumn("x0", rand()
).withColumn("x1", rand()
).withColumn("x2", rand()
).withColumn("x3", rand()
).withColumn("x4", rand()
).withColumn("x5", rand()
).withColumn("x6", rand()
).withColumn("x7", rand()
).withColumn("x8", rand()
).withColumn("x9", rand())

df.cache; df.count

(1 to 10).toArray.par.map { i => println(i); 
df.groupBy("x1").agg(count("value")).show() }
{code}
 

In the above example, we generated a random DataFrame of size around 7G; cache 
it and then did a parallel DataFrame operations by using `array.par.map`. 
Because of the parallel computation, with high possibility, some task will be 
scheduled to a host-2 where it needs to read the cache block data from host-1. 
This will follow the code path of 
[https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L691]
 then try to transfer a big block (~ 600MB) of cache block from host-1 to 
host-2. Often, this big transfer made the cluster suffer time out issue (it 
will retry 3 times, each with 120s timeout, and then 

[jira] [Updated] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue

2018-06-18 Thread Wenbo Zhao (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenbo Zhao updated SPARK-24578:
---
Description: 
After Spark 2.3, we observed lots of errors like the following
{code:java}
18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result 
ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003, 
chunkIndex=0}, 
buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to 
/172.22.18.7:60865; closing connection
java.io.IOException: Broken pipe
at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
at sun.nio.ch.IOUtil.write(IOUtil.java:65)
at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
at 
org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156)
at 
org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142)
at 
org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123)
at 
io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355)
at 
io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224)
at 
io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382)
at 
io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934)
at 
io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362)
at 
io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901)
at 
io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
at 
io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
at 
io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
at 
io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
at 
io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
at 
io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983)
at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248)
at 
io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284)
at 
io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
at 
io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
at 
io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
at 
io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
{code}
 

Here is a small reproducible for a small cluster of 2 executors (say host-1 and 
host-2) each with 8 cores (the memory of driver and executors are not a import 
factor here as long as it is big enough, say 20G).
{code:java}
val n = 1
val df0 = sc.parallelize(1 to n).toDF
val df = df0.withColumn("x0", rand()).withColumn("x0", rand()
).withColumn("x1", rand()
).withColumn("x2", rand()
).withColumn("x3", rand()
).withColumn("x4", rand()
).withColumn("x5", rand()
).withColumn("x6", rand()
).withColumn("x7", rand()
).withColumn("x8", rand()
).withColumn("x9", rand())

df.cache; df.count

(1 to 10).toArray.par.map { i => println(i); 
df.groupBy("x1").agg(count("value")).show() }
{code}
 

In the above example, we generated a random DataFrame of size around 7G; cache 
it and then did a parallel DataFrame operations by using `array.par.map`. 
Because of the parallel computation, with high possibility, some task will be 
scheduled to a host-2 where it needs to read the cache block data from host-1. 
This will follow the code path of 
[https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L691]
 then try to transfer a big block (~ 600MB) of cache block from host-1 to 
host-2. Often, this big transfer made the cluster suffer time out issue (it 
will retry 3 times, each with 120s timeout, and then do recompute to put the 
cache 

[jira] [Updated] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue

2018-06-18 Thread Wenbo Zhao (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenbo Zhao updated SPARK-24578:
---
Description: 
After Spark 2.3, we observed lots of errors like the following

 
{code:java}
18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result 
ChunkFetchSuccess{streamChunkId=StreamChunkId

{streamId=91672904003, chunkIndex=0}

, buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to 
/172.22.18.7:60865; closing connection
java.io.IOException: Broken pipe
at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
at sun.nio.ch.IOUtil.write(IOUtil.java:65)
at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
at 
org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156)
at 
org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142)
at 
org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123)
at 
io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355)
at 
io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224)
at 
io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382)
at 
io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934)
at 
io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362)
at 
io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901)
at 
io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
at 
io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
at 
io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
at 
io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
at 
io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
at 
io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983)
at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248)
at 
io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284)
at 
io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
at 
io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
at 
io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
at 
io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
{code}
Here is a small reproducible for a small cluster of 2 executors (say host-1 and 
host-2) each with 8 cores (the memory of driver and executors are not a import 
factor here as long as it is big enough, say 10G).

 

 
{code:java}
val n = 1
val df0 = sc.parallelize(1 to n).toDF
val df = df0.withColumn("x0", rand()).withColumn("x0", rand()
).withColumn("x1", rand()
).withColumn("x2", rand()
).withColumn("x3", rand()
).withColumn("x4", rand()
).withColumn("x5", rand()
).withColumn("x6", rand()
).withColumn("x7", rand()
).withColumn("x8", rand()
).withColumn("x9", rand())

df.cache; df.count

(1 to 10).toArray.par.map { i => println(i); 
df.groupBy("x1").agg(count("value")).show() }
{code}
 

In the above example, we generated a random DataFrame of size around 7G; cache 
it and then did a parallel DataFrame operations by using `array.par.map`. 
Because of the parallel computation, with high possibility, some task will be 
scheduled to a host-2 where the task needs to read the cache block data from 
host-1. This will follow the code path of 
[https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L691]
 then try to transfer a big block (~ 600MB) of cache block from host-1 to 
host-2. Often, this big transfer made the cluster suffer time out issue. 

We couldn't to reproduce the same issue in Spark 2.2.1. From the log of 

[jira] [Created] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue

2018-06-18 Thread Wenbo Zhao (JIRA)
Wenbo Zhao created SPARK-24578:
--

 Summary: Reading remote cache block behavior changes and causes 
timeout issue
 Key: SPARK-24578
 URL: https://issues.apache.org/jira/browse/SPARK-24578
 Project: Spark
  Issue Type: Bug
  Components: Input/Output
Affects Versions: 2.3.1, 2.3.0
Reporter: Wenbo Zhao


After Spark 2.3, we observed lots of errors like the following
18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result 
ChunkFetchSuccess\{streamChunkId=StreamChunkId{streamId=91672904003, 
chunkIndex=0}, 
buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to 
/172.22.18.7:60865; closing connection
java.io.IOException: Broken pipe
at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
at sun.nio.ch.IOUtil.write(IOUtil.java:65)
at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
at 
org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156)
at 
org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142)
at 
org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123)
at 
io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355)
at 
io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224)
at 
io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382)
at 
io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934)
at 
io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362)
at 
io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901)
at 
io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
at 
io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
at 
io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
at 
io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
at 
io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
at 
io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
at 
io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983)
at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248)
at 
io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284)
at 
io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
at 
io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
at 
io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
at 
io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
Here is a small reproducible for a small cluster of 2 executors each with 8 
cores (the memory of driver and executors are not a import factor here as long 
as it is big enough, say 10G).
val n = 1
val df0 = sc.parallelize(1 to n).toDF
val df = df0.withColumn("x0", rand()).withColumn("x0", rand()
).withColumn("x1", rand()
).withColumn("x2", rand()
).withColumn("x3", rand()
).withColumn("x4", rand()
).withColumn("x5", rand()
).withColumn("x6", rand()
).withColumn("x7", rand()
).withColumn("x8", rand()
).withColumn("x9", rand())
df.cache; df.count
(1 to 10).toArray.par.map { i => println(i);  
df.groupBy("x1").agg(count("value")).show() }
 

In the above example, we generated a random DataFrame of size around 7G; cache 
it and then did a parallel Dataframe operations by using `array.par.map`. 
Because of the parallel computation, with high possibility, some task will be 
scheduled to a  host-2 where the task needs to 

[jira] [Commented] (SPARK-24373) "df.cache() df.count()" no longer eagerly caches data when the analyzed plans are different after re-analyzing the plans

2018-05-29 Thread Wenbo Zhao (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16493569#comment-16493569
 ] 

Wenbo Zhao commented on SPARK-24373:


[~mgaido] Thanks. I didn't look the comment carefully. 

> "df.cache() df.count()" no longer eagerly caches data when the analyzed plans 
> are different after re-analyzing the plans
> 
>
> Key: SPARK-24373
> URL: https://issues.apache.org/jira/browse/SPARK-24373
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Wenbo Zhao
>Assignee: Marco Gaido
>Priority: Blocker
> Fix For: 2.3.1, 2.4.0
>
>
> Here is the code to reproduce in local mode
> {code:java}
> scala> val df = sc.range(1, 2).toDF
> df: org.apache.spark.sql.DataFrame = [value: bigint]
> scala> val myudf = udf({x: Long => println(""); x + 1})
> myudf: org.apache.spark.sql.expressions.UserDefinedFunction = 
> UserDefinedFunction(,LongType,Some(List(LongType)))
> scala> val df1 = df.withColumn("value1", myudf(col("value")))
> df1: org.apache.spark.sql.DataFrame = [value: bigint, value1: bigint]
> scala> df1.cache
> res0: df1.type = [value: bigint, value1: bigint]
> scala> df1.count
> res1: Long = 1 
> scala> df1.count
> res2: Long = 1
> scala> df1.count
> res3: Long = 1
> {code}
>  
> in Spark 2.2, you could see it prints "". 
> In the above example, when you do explain. You could see
> {code:java}
> scala> df1.explain(true)
> == Parsed Logical Plan ==
> 'Project [value#2L, UDF('value) AS value1#5]
> +- AnalysisBarrier
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Analyzed Logical Plan ==
> value: bigint, value1: bigint
> Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS value1#5L]
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Optimized Logical Plan ==
> InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, 
> memory, deserialized, 1 replicas)
> +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L]
> +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- Scan ExternalRDDScan[obj#1L]
> == Physical Plan ==
> *(1) InMemoryTableScan [value#2L, value1#5L]
> +- InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, 
> memory, deserialized, 1 replicas)
> +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L]
> +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- Scan ExternalRDDScan[obj#1L]
> {code}
> but the ImMemoryTableScan is mising in the following explain()
> {code:java}
> scala> df1.groupBy().count().explain(true)
> == Parsed Logical Plan ==
> Aggregate [count(1) AS count#170L]
> +- Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS 
> value1#5L]
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Analyzed Logical Plan ==
> count: bigint
> Aggregate [count(1) AS count#170L]
> +- Project [value#2L, if (isnull(value#2L)) null else if (isnull(value#2L)) 
> null else UDF(value#2L) AS value1#5L]
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Optimized Logical Plan ==
> Aggregate [count(1) AS count#170L]
> +- Project
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Physical Plan ==
> *(2) HashAggregate(keys=[], functions=[count(1)], output=[count#170L])
> +- Exchange SinglePartition
> +- *(1) HashAggregate(keys=[], functions=[partial_count(1)], 
> output=[count#175L])
> +- *(1) Project
> +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- Scan ExternalRDDScan[obj#1L]
> {code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Issue Comment Deleted] (SPARK-24373) "df.cache() df.count()" no longer eagerly caches data when the analyzed plans are different after re-analyzing the plans

2018-05-29 Thread Wenbo Zhao (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenbo Zhao updated SPARK-24373:
---
Comment: was deleted

(was: Same question as [~icexelloss]. Also, any plan to make your fix into a 
more complete status, e.g. also fix the {{flatMapGroupsInPandas}} ? )

> "df.cache() df.count()" no longer eagerly caches data when the analyzed plans 
> are different after re-analyzing the plans
> 
>
> Key: SPARK-24373
> URL: https://issues.apache.org/jira/browse/SPARK-24373
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Wenbo Zhao
>Assignee: Marco Gaido
>Priority: Blocker
> Fix For: 2.3.1, 2.4.0
>
>
> Here is the code to reproduce in local mode
> {code:java}
> scala> val df = sc.range(1, 2).toDF
> df: org.apache.spark.sql.DataFrame = [value: bigint]
> scala> val myudf = udf({x: Long => println(""); x + 1})
> myudf: org.apache.spark.sql.expressions.UserDefinedFunction = 
> UserDefinedFunction(,LongType,Some(List(LongType)))
> scala> val df1 = df.withColumn("value1", myudf(col("value")))
> df1: org.apache.spark.sql.DataFrame = [value: bigint, value1: bigint]
> scala> df1.cache
> res0: df1.type = [value: bigint, value1: bigint]
> scala> df1.count
> res1: Long = 1 
> scala> df1.count
> res2: Long = 1
> scala> df1.count
> res3: Long = 1
> {code}
>  
> in Spark 2.2, you could see it prints "". 
> In the above example, when you do explain. You could see
> {code:java}
> scala> df1.explain(true)
> == Parsed Logical Plan ==
> 'Project [value#2L, UDF('value) AS value1#5]
> +- AnalysisBarrier
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Analyzed Logical Plan ==
> value: bigint, value1: bigint
> Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS value1#5L]
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Optimized Logical Plan ==
> InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, 
> memory, deserialized, 1 replicas)
> +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L]
> +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- Scan ExternalRDDScan[obj#1L]
> == Physical Plan ==
> *(1) InMemoryTableScan [value#2L, value1#5L]
> +- InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, 
> memory, deserialized, 1 replicas)
> +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L]
> +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- Scan ExternalRDDScan[obj#1L]
> {code}
> but the ImMemoryTableScan is mising in the following explain()
> {code:java}
> scala> df1.groupBy().count().explain(true)
> == Parsed Logical Plan ==
> Aggregate [count(1) AS count#170L]
> +- Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS 
> value1#5L]
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Analyzed Logical Plan ==
> count: bigint
> Aggregate [count(1) AS count#170L]
> +- Project [value#2L, if (isnull(value#2L)) null else if (isnull(value#2L)) 
> null else UDF(value#2L) AS value1#5L]
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Optimized Logical Plan ==
> Aggregate [count(1) AS count#170L]
> +- Project
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Physical Plan ==
> *(2) HashAggregate(keys=[], functions=[count(1)], output=[count#170L])
> +- Exchange SinglePartition
> +- *(1) HashAggregate(keys=[], functions=[partial_count(1)], 
> output=[count#175L])
> +- *(1) Project
> +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- Scan ExternalRDDScan[obj#1L]
> {code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24373) "df.cache() df.count()" no longer eagerly caches data when the analyzed plans are different after re-analyzing the plans

2018-05-29 Thread Wenbo Zhao (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16493545#comment-16493545
 ] 

Wenbo Zhao commented on SPARK-24373:


Same question as [~icexelloss]. Also, any plan to make your fix into a more 
complete status, e.g. also fix the {{flatMapGroupsInPandas}} ? 

> "df.cache() df.count()" no longer eagerly caches data when the analyzed plans 
> are different after re-analyzing the plans
> 
>
> Key: SPARK-24373
> URL: https://issues.apache.org/jira/browse/SPARK-24373
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Wenbo Zhao
>Assignee: Marco Gaido
>Priority: Blocker
> Fix For: 2.3.1, 2.4.0
>
>
> Here is the code to reproduce in local mode
> {code:java}
> scala> val df = sc.range(1, 2).toDF
> df: org.apache.spark.sql.DataFrame = [value: bigint]
> scala> val myudf = udf({x: Long => println(""); x + 1})
> myudf: org.apache.spark.sql.expressions.UserDefinedFunction = 
> UserDefinedFunction(,LongType,Some(List(LongType)))
> scala> val df1 = df.withColumn("value1", myudf(col("value")))
> df1: org.apache.spark.sql.DataFrame = [value: bigint, value1: bigint]
> scala> df1.cache
> res0: df1.type = [value: bigint, value1: bigint]
> scala> df1.count
> res1: Long = 1 
> scala> df1.count
> res2: Long = 1
> scala> df1.count
> res3: Long = 1
> {code}
>  
> in Spark 2.2, you could see it prints "". 
> In the above example, when you do explain. You could see
> {code:java}
> scala> df1.explain(true)
> == Parsed Logical Plan ==
> 'Project [value#2L, UDF('value) AS value1#5]
> +- AnalysisBarrier
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Analyzed Logical Plan ==
> value: bigint, value1: bigint
> Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS value1#5L]
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Optimized Logical Plan ==
> InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, 
> memory, deserialized, 1 replicas)
> +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L]
> +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- Scan ExternalRDDScan[obj#1L]
> == Physical Plan ==
> *(1) InMemoryTableScan [value#2L, value1#5L]
> +- InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, 
> memory, deserialized, 1 replicas)
> +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L]
> +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- Scan ExternalRDDScan[obj#1L]
> {code}
> but the ImMemoryTableScan is mising in the following explain()
> {code:java}
> scala> df1.groupBy().count().explain(true)
> == Parsed Logical Plan ==
> Aggregate [count(1) AS count#170L]
> +- Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS 
> value1#5L]
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Analyzed Logical Plan ==
> count: bigint
> Aggregate [count(1) AS count#170L]
> +- Project [value#2L, if (isnull(value#2L)) null else if (isnull(value#2L)) 
> null else UDF(value#2L) AS value1#5L]
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Optimized Logical Plan ==
> Aggregate [count(1) AS count#170L]
> +- Project
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Physical Plan ==
> *(2) HashAggregate(keys=[], functions=[count(1)], output=[count#170L])
> +- Exchange SinglePartition
> +- *(1) HashAggregate(keys=[], functions=[partial_count(1)], 
> output=[count#175L])
> +- *(1) Project
> +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- Scan ExternalRDDScan[obj#1L]
> {code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24373) "df.cache() df.count()" no longer eagerly caches data

2018-05-24 Thread Wenbo Zhao (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-24373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16489820#comment-16489820
 ] 

Wenbo Zhao commented on SPARK-24373:


I guess we should use `planWithBarrier` in the 'RelationalGroupedDataset' or 
other similar places. Any suggestion?

> "df.cache() df.count()" no longer eagerly caches data
> -
>
> Key: SPARK-24373
> URL: https://issues.apache.org/jira/browse/SPARK-24373
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Wenbo Zhao
>Priority: Major
>
> Here is the code to reproduce in local mode
> {code:java}
> scala> val df = sc.range(1, 2).toDF
> df: org.apache.spark.sql.DataFrame = [value: bigint]
> scala> val myudf = udf({x: Long => println(""); x + 1})
> myudf: org.apache.spark.sql.expressions.UserDefinedFunction = 
> UserDefinedFunction(,LongType,Some(List(LongType)))
> scala> val df1 = df.withColumn("value1", myudf(col("value")))
> df1: org.apache.spark.sql.DataFrame = [value: bigint, value1: bigint]
> scala> df1.cache
> res0: df1.type = [value: bigint, value1: bigint]
> scala> df1.count
> res1: Long = 1 
> scala> df1.count
> res2: Long = 1
> scala> df1.count
> res3: Long = 1
> {code}
>  
> in Spark 2.2, you could see it prints "". 
> In the above example, when you do explain. You could see
> {code:java}
> scala> df1.explain(true)
> == Parsed Logical Plan ==
> 'Project [value#2L, UDF('value) AS value1#5]
> +- AnalysisBarrier
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Analyzed Logical Plan ==
> value: bigint, value1: bigint
> Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS value1#5L]
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Optimized Logical Plan ==
> InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, 
> memory, deserialized, 1 replicas)
> +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L]
> +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- Scan ExternalRDDScan[obj#1L]
> == Physical Plan ==
> *(1) InMemoryTableScan [value#2L, value1#5L]
> +- InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, 
> memory, deserialized, 1 replicas)
> +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L]
> +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- Scan ExternalRDDScan[obj#1L]
> {code}
> but the ImMemoryTableScan is mising in the following explain()
> {code:java}
> scala> df1.groupBy().count().explain(true)
> == Parsed Logical Plan ==
> Aggregate [count(1) AS count#170L]
> +- Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS 
> value1#5L]
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Analyzed Logical Plan ==
> count: bigint
> Aggregate [count(1) AS count#170L]
> +- Project [value#2L, if (isnull(value#2L)) null else if (isnull(value#2L)) 
> null else UDF(value#2L) AS value1#5L]
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Optimized Logical Plan ==
> Aggregate [count(1) AS count#170L]
> +- Project
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Physical Plan ==
> *(2) HashAggregate(keys=[], functions=[count(1)], output=[count#170L])
> +- Exchange SinglePartition
> +- *(1) HashAggregate(keys=[], functions=[partial_count(1)], 
> output=[count#175L])
> +- *(1) Project
> +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- Scan ExternalRDDScan[obj#1L]
> {code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24373) "df.cache() df.count()" no longer eagerly caches data

2018-05-24 Thread Wenbo Zhao (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-24373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16489534#comment-16489534
 ] 

Wenbo Zhao commented on SPARK-24373:


It is not apparently to me that they are the same issue though

> "df.cache() df.count()" no longer eagerly caches data
> -
>
> Key: SPARK-24373
> URL: https://issues.apache.org/jira/browse/SPARK-24373
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Wenbo Zhao
>Priority: Major
>
> Here is the code to reproduce in local mode
> {code:java}
> scala> val df = sc.range(1, 2).toDF
> df: org.apache.spark.sql.DataFrame = [value: bigint]
> scala> val myudf = udf({x: Long => println(""); x + 1})
> myudf: org.apache.spark.sql.expressions.UserDefinedFunction = 
> UserDefinedFunction(,LongType,Some(List(LongType)))
> scala> val df1 = df.withColumn("value1", myudf(col("value")))
> df1: org.apache.spark.sql.DataFrame = [value: bigint, value1: bigint]
> scala> df1.cache
> res0: df1.type = [value: bigint, value1: bigint]
> scala> df1.count
> res1: Long = 1 
> scala> df1.count
> res2: Long = 1
> scala> df1.count
> res3: Long = 1
> {code}
>  
> in Spark 2.2, you could see it prints "". 
> In the above example, when you do explain. You could see
> {code:java}
> scala> df1.explain(true)
> == Parsed Logical Plan ==
> 'Project [value#2L, UDF('value) AS value1#5]
> +- AnalysisBarrier
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Analyzed Logical Plan ==
> value: bigint, value1: bigint
> Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS value1#5L]
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Optimized Logical Plan ==
> InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, 
> memory, deserialized, 1 replicas)
> +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L]
> +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- Scan ExternalRDDScan[obj#1L]
> == Physical Plan ==
> *(1) InMemoryTableScan [value#2L, value1#5L]
> +- InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, 
> memory, deserialized, 1 replicas)
> +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L]
> +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- Scan ExternalRDDScan[obj#1L]
> {code}
> but the ImMemoryTableScan is mising in the following explain()
> {code:java}
> scala> df1.groupBy().count().explain(true)
> == Parsed Logical Plan ==
> Aggregate [count(1) AS count#170L]
> +- Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS 
> value1#5L]
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Analyzed Logical Plan ==
> count: bigint
> Aggregate [count(1) AS count#170L]
> +- Project [value#2L, if (isnull(value#2L)) null else if (isnull(value#2L)) 
> null else UDF(value#2L) AS value1#5L]
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Optimized Logical Plan ==
> Aggregate [count(1) AS count#170L]
> +- Project
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Physical Plan ==
> *(2) HashAggregate(keys=[], functions=[count(1)], output=[count#170L])
> +- Exchange SinglePartition
> +- *(1) HashAggregate(keys=[], functions=[partial_count(1)], 
> output=[count#175L])
> +- *(1) Project
> +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- Scan ExternalRDDScan[obj#1L]
> {code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-24373) "df.cache() df.count()" no longer eagerly caches data

2018-05-24 Thread Wenbo Zhao (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-24373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16489233#comment-16489233
 ] 

Wenbo Zhao edited comment on SPARK-24373 at 5/24/18 3:37 PM:
-

I turned on the log trace of RuleExecutor and found that in my example of 
df1.count() after cache. 

 
{code:java}
scala> df1.groupBy().count().explain(true)

=== Applying Rule 
org.apache.spark.sql.catalyst.analysis.Analyzer$HandleNullInputsForUDF ===
Aggregate [count(1) AS count#40L] Aggregate [count(1) AS count#40L]
!+- Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS 
value1#5L] +- Project [value#2L, if (isnull(value#2L)) null else if 
(isnull(value#2L)) null else UDF(value#2L) AS value1#5L]
+- SerializeFromObject [input[0, bigint, false] AS value#2L] +- 
SerializeFromObject [input[0, bigint, false] AS value#2L]
+- ExternalRDD [obj#1L] +- ExternalRDD [obj#1L]
{code}
 that is node
{code:java}
Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS 
value1#5L]{code}
becomes 
{code:java}
Project [value#2L, if (isnull(value#2L)) null else if (isnull(value#2L)) null 
else UDF(value#2L) AS value1#5L]{code}
 

This will cause a miss in the CacheManager?

which could be confirmed by later applying ColumnPrunning rule's log trace.  

May question is: is that supposed protected by AnalysisBarrier ?


was (Author: wbzhao):
I turned on the log trace of RuleExecutor and found that in my example of 
df1.count() after cache. 

 
{code:java}
scala> df1.groupBy().count().explain(true)

=== Applying Rule 
org.apache.spark.sql.catalyst.analysis.Analyzer$HandleNullInputsForUDF ===
Aggregate [count(1) AS count#40L] Aggregate [count(1) AS count#40L]
!+- Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS 
value1#5L] +- Project [value#2L, if (isnull(value#2L)) null else if 
(isnull(value#2L)) null else UDF(value#2L) AS value1#5L]
+- SerializeFromObject [input[0, bigint, false] AS value#2L] +- 
SerializeFromObject [input[0, bigint, false] AS value#2L]
+- ExternalRDD [obj#1L] +- ExternalRDD [obj#1L]
{code}
 that is node
{code:java}
Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS 
value1#5L]{code}
becomes 
{code:java}
Project [value#2L, if (isnull(value#2L)) null else if (isnull(value#2L)) null 
else UDF(value#2L) AS value1#5L]{code}
 

This will cause a miss in the CacheManager?

which could be confirmed by later applying ColumnPrunning rule's log trace. 

 

May question is: is that supposed protected by AnalysisBarrier ?

 

 

 

 

 

> "df.cache() df.count()" no longer eagerly caches data
> -
>
> Key: SPARK-24373
> URL: https://issues.apache.org/jira/browse/SPARK-24373
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Wenbo Zhao
>Priority: Major
>
> Here is the code to reproduce in local mode
> {code:java}
> scala> val df = sc.range(1, 2).toDF
> df: org.apache.spark.sql.DataFrame = [value: bigint]
> scala> val myudf = udf({x: Long => println(""); x + 1})
> myudf: org.apache.spark.sql.expressions.UserDefinedFunction = 
> UserDefinedFunction(,LongType,Some(List(LongType)))
> scala> val df1 = df.withColumn("value1", myudf(col("value")))
> df1: org.apache.spark.sql.DataFrame = [value: bigint, value1: bigint]
> scala> df1.cache
> res0: df1.type = [value: bigint, value1: bigint]
> scala> df1.count
> res1: Long = 1 
> scala> df1.count
> res2: Long = 1
> scala> df1.count
> res3: Long = 1
> {code}
>  
> in Spark 2.2, you could see it prints "". 
> In the above example, when you do explain. You could see
> {code:java}
> scala> df1.explain(true)
> == Parsed Logical Plan ==
> 'Project [value#2L, UDF('value) AS value1#5]
> +- AnalysisBarrier
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Analyzed Logical Plan ==
> value: bigint, value1: bigint
> Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS value1#5L]
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Optimized Logical Plan ==
> InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, 
> memory, deserialized, 1 replicas)
> +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L]
> +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- Scan ExternalRDDScan[obj#1L]
> == Physical Plan ==
> *(1) InMemoryTableScan [value#2L, value1#5L]
> +- InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, 
> memory, deserialized, 1 replicas)
> +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L]
> +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- Scan ExternalRDDScan[obj#1L]
> {code}
> but the ImMemoryTableScan is mising in the following explain()
> {code:java}
> scala> 

[jira] [Commented] (SPARK-24373) "df.cache() df.count()" no longer eagerly caches data

2018-05-24 Thread Wenbo Zhao (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-24373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16489233#comment-16489233
 ] 

Wenbo Zhao commented on SPARK-24373:


I turned on the log trace of RuleExecutor and found that in my example of 
df1.count() after cache. 

 
{code:java}
scala> df1.groupBy().count().explain(true)

=== Applying Rule 
org.apache.spark.sql.catalyst.analysis.Analyzer$HandleNullInputsForUDF ===
Aggregate [count(1) AS count#40L] Aggregate [count(1) AS count#40L]
!+- Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS 
value1#5L] +- Project [value#2L, if (isnull(value#2L)) null else if 
(isnull(value#2L)) null else UDF(value#2L) AS value1#5L]
+- SerializeFromObject [input[0, bigint, false] AS value#2L] +- 
SerializeFromObject [input[0, bigint, false] AS value#2L]
+- ExternalRDD [obj#1L] +- ExternalRDD [obj#1L]
{code}
 that is node
{code:java}
Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS 
value1#5L]{code}
becomes 
{code:java}
Project [value#2L, if (isnull(value#2L)) null else if (isnull(value#2L)) null 
else UDF(value#2L) AS value1#5L]{code}
 

This will cause a miss in the CacheManager?

which could be confirmed by later applying ColumnPrunning rule's log trace. 

 

May question is: is that supposed protected by AnalysisBarrier ?

 

 

 

 

 

> "df.cache() df.count()" no longer eagerly caches data
> -
>
> Key: SPARK-24373
> URL: https://issues.apache.org/jira/browse/SPARK-24373
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Wenbo Zhao
>Priority: Major
>
> Here is the code to reproduce in local mode
> {code:java}
> scala> val df = sc.range(1, 2).toDF
> df: org.apache.spark.sql.DataFrame = [value: bigint]
> scala> val myudf = udf({x: Long => println(""); x + 1})
> myudf: org.apache.spark.sql.expressions.UserDefinedFunction = 
> UserDefinedFunction(,LongType,Some(List(LongType)))
> scala> val df1 = df.withColumn("value1", myudf(col("value")))
> df1: org.apache.spark.sql.DataFrame = [value: bigint, value1: bigint]
> scala> df1.cache
> res0: df1.type = [value: bigint, value1: bigint]
> scala> df1.count
> res1: Long = 1 
> scala> df1.count
> res2: Long = 1
> scala> df1.count
> res3: Long = 1
> {code}
>  
> in Spark 2.2, you could see it prints "". 
> In the above example, when you do explain. You could see
> {code:java}
> scala> df1.explain(true)
> == Parsed Logical Plan ==
> 'Project [value#2L, UDF('value) AS value1#5]
> +- AnalysisBarrier
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Analyzed Logical Plan ==
> value: bigint, value1: bigint
> Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS value1#5L]
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Optimized Logical Plan ==
> InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, 
> memory, deserialized, 1 replicas)
> +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L]
> +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- Scan ExternalRDDScan[obj#1L]
> == Physical Plan ==
> *(1) InMemoryTableScan [value#2L, value1#5L]
> +- InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, 
> memory, deserialized, 1 replicas)
> +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L]
> +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- Scan ExternalRDDScan[obj#1L]
> {code}
> but the ImMemoryTableScan is mising in the following explain()
> {code:java}
> scala> df1.groupBy().count().explain(true)
> == Parsed Logical Plan ==
> Aggregate [count(1) AS count#170L]
> +- Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS 
> value1#5L]
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Analyzed Logical Plan ==
> count: bigint
> Aggregate [count(1) AS count#170L]
> +- Project [value#2L, if (isnull(value#2L)) null else if (isnull(value#2L)) 
> null else UDF(value#2L) AS value1#5L]
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Optimized Logical Plan ==
> Aggregate [count(1) AS count#170L]
> +- Project
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Physical Plan ==
> *(2) HashAggregate(keys=[], functions=[count(1)], output=[count#170L])
> +- Exchange SinglePartition
> +- *(1) HashAggregate(keys=[], functions=[partial_count(1)], 
> output=[count#175L])
> +- *(1) Project
> +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- Scan ExternalRDDScan[obj#1L]
> {code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, 

[jira] [Updated] (SPARK-24373) Spark Dataset groupby.agg/count doesn't respect cache with UDF

2018-05-23 Thread Wenbo Zhao (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-24373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenbo Zhao updated SPARK-24373:
---
Description: 
Here is the code to reproduce in local mode
{code:java}
scala> val df = sc.range(1, 2).toDF
df: org.apache.spark.sql.DataFrame = [value: bigint]

scala> val myudf = udf({x: Long => println(""); x + 1})
myudf: org.apache.spark.sql.expressions.UserDefinedFunction = 
UserDefinedFunction(,LongType,Some(List(LongType)))

scala> val df1 = df.withColumn("value1", myudf(col("value")))
df1: org.apache.spark.sql.DataFrame = [value: bigint, value1: bigint]

scala> df1.cache
res0: df1.type = [value: bigint, value1: bigint]

scala> df1.count
res1: Long = 1 

scala> df1.count
res2: Long = 1

scala> df1.count
res3: Long = 1
{code}
 

in Spark 2.2, you could see it prints "". 

In the above example, when you do explain. You could see
{code:java}
scala> df1.explain(true)
== Parsed Logical Plan ==
'Project [value#2L, UDF('value) AS value1#5]
+- AnalysisBarrier
+- SerializeFromObject [input[0, bigint, false] AS value#2L]
+- ExternalRDD [obj#1L]

== Analyzed Logical Plan ==
value: bigint, value1: bigint
Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS value1#5L]
+- SerializeFromObject [input[0, bigint, false] AS value#2L]
+- ExternalRDD [obj#1L]

== Optimized Logical Plan ==
InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, memory, 
deserialized, 1 replicas)
+- *(1) Project [value#2L, UDF(value#2L) AS value1#5L]
+- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
+- Scan ExternalRDDScan[obj#1L]

== Physical Plan ==
*(1) InMemoryTableScan [value#2L, value1#5L]
+- InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, 
memory, deserialized, 1 replicas)
+- *(1) Project [value#2L, UDF(value#2L) AS value1#5L]
+- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
+- Scan ExternalRDDScan[obj#1L]

{code}
but the ImMemoryTableScan is mising in the following explain()
{code:java}
scala> df1.groupBy().count().explain(true)
== Parsed Logical Plan ==
Aggregate [count(1) AS count#170L]
+- Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS 
value1#5L]
+- SerializeFromObject [input[0, bigint, false] AS value#2L]
+- ExternalRDD [obj#1L]

== Analyzed Logical Plan ==
count: bigint
Aggregate [count(1) AS count#170L]
+- Project [value#2L, if (isnull(value#2L)) null else if (isnull(value#2L)) 
null else UDF(value#2L) AS value1#5L]
+- SerializeFromObject [input[0, bigint, false] AS value#2L]
+- ExternalRDD [obj#1L]

== Optimized Logical Plan ==
Aggregate [count(1) AS count#170L]
+- Project
+- SerializeFromObject [input[0, bigint, false] AS value#2L]
+- ExternalRDD [obj#1L]

== Physical Plan ==
*(2) HashAggregate(keys=[], functions=[count(1)], output=[count#170L])
+- Exchange SinglePartition
+- *(1) HashAggregate(keys=[], functions=[partial_count(1)], 
output=[count#175L])
+- *(1) Project
+- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
+- Scan ExternalRDDScan[obj#1L]
{code}
 

 

  was:
Here is the code to reproduce in local mode
{code:java}
scala> val df = sc.range(1, 2).toDF
df: org.apache.spark.sql.DataFrame = [value: bigint]
scala> val myudf = udf({x: Long => println(""); x + 1})
scala> val df1 = df.withColumn("value1", myudf(col("value")))
scala> df1.cache()
res0: df1.type = [value: bigint, value1: bigint] 
scala> df1.count
res1: Long = 1
scala> df1.count
res2: Long = 1 
scala> df1.count 
res3: Long = 1
{code}
 

in Spark 2.2, you could see it prints "". 

In the above example, when you do explain. You could see
{code:java}
scala> df1.explain(true)
== Parsed Logical Plan ==
'Project [value#2L, UDF('value) AS value1#5]
+- AnalysisBarrier
+- SerializeFromObject [input[0, bigint, false] AS value#2L]
+- ExternalRDD [obj#1L]

== Analyzed Logical Plan ==
value: bigint, value1: bigint
Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS value1#5L]
+- SerializeFromObject [input[0, bigint, false] AS value#2L]
+- ExternalRDD [obj#1L]

== Optimized Logical Plan ==
InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, memory, 
deserialized, 1 replicas)
+- *(1) Project [value#2L, UDF(value#2L) AS value1#5L]
+- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
+- Scan ExternalRDDScan[obj#1L]

== Physical Plan ==
*(1) InMemoryTableScan [value#2L, value1#5L]
+- InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, 
memory, deserialized, 1 replicas)
+- *(1) Project [value#2L, UDF(value#2L) AS value1#5L]
+- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
+- Scan ExternalRDDScan[obj#1L]

{code}
but the ImMemoryTableScan is mising in the following explain()
{code:java}
scala> df1.groupBy().count().explain(true)
== Parsed Logical Plan ==
Aggregate [count(1) AS count#170L]
+- Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS 

[jira] [Updated] (SPARK-24373) Spark Dataset groupby.agg/count doesn't respect cache with UDF

2018-05-23 Thread Wenbo Zhao (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-24373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenbo Zhao updated SPARK-24373:
---
Description: 
Here is the code to reproduce in local mode
{code:java}
scala> val df = sc.range(1, 2).toDF
df: org.apache.spark.sql.DataFrame = [value: bigint]
scala> val myudf = udf({x: Long => println(""); x + 1})
scala> val df1 = df.withColumn("value1", myudf(col("value")))
scala> df1.cache()
res0: df1.type = [value: bigint, value1: bigint] 
scala> df1.count
res1: Long = 1
scala> df1.count
res2: Long = 1 
scala> df1.count 
res3: Long = 1
{code}
 

in Spark 2.2, you could see it prints "". 

In the above example, when you do explain. You could see
{code:java}
scala> df1.explain(true)
== Parsed Logical Plan ==
'Project [value#2L, UDF('value) AS value1#5]
+- AnalysisBarrier
+- SerializeFromObject [input[0, bigint, false] AS value#2L]
+- ExternalRDD [obj#1L]

== Analyzed Logical Plan ==
value: bigint, value1: bigint
Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS value1#5L]
+- SerializeFromObject [input[0, bigint, false] AS value#2L]
+- ExternalRDD [obj#1L]

== Optimized Logical Plan ==
InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, memory, 
deserialized, 1 replicas)
+- *(1) Project [value#2L, UDF(value#2L) AS value1#5L]
+- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
+- Scan ExternalRDDScan[obj#1L]

== Physical Plan ==
*(1) InMemoryTableScan [value#2L, value1#5L]
+- InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, 
memory, deserialized, 1 replicas)
+- *(1) Project [value#2L, UDF(value#2L) AS value1#5L]
+- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
+- Scan ExternalRDDScan[obj#1L]

{code}
but the ImMemoryTableScan is mising in the following explain()
{code:java}
scala> df1.groupBy().count().explain(true)
== Parsed Logical Plan ==
Aggregate [count(1) AS count#170L]
+- Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS 
value1#5L]
+- SerializeFromObject [input[0, bigint, false] AS value#2L]
+- ExternalRDD [obj#1L]

== Analyzed Logical Plan ==
count: bigint
Aggregate [count(1) AS count#170L]
+- Project [value#2L, if (isnull(value#2L)) null else if (isnull(value#2L)) 
null else UDF(value#2L) AS value1#5L]
+- SerializeFromObject [input[0, bigint, false] AS value#2L]
+- ExternalRDD [obj#1L]

== Optimized Logical Plan ==
Aggregate [count(1) AS count#170L]
+- Project
+- SerializeFromObject [input[0, bigint, false] AS value#2L]
+- ExternalRDD [obj#1L]

== Physical Plan ==
*(2) HashAggregate(keys=[], functions=[count(1)], output=[count#170L])
+- Exchange SinglePartition
+- *(1) HashAggregate(keys=[], functions=[partial_count(1)], 
output=[count#175L])
+- *(1) Project
+- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
+- Scan ExternalRDDScan[obj#1L]
{code}
 

 

  was:
Here is the code to reproduce in local mode
{code:java}
val df = sc.range(1, 2).toDF
val myudf = udf({x: Long => println(""); x + 1})
scala> df1.cache()
res0: df1.type = [value: bigint, value1: bigint] 
scala> df1.count
res1: Long = 1
scala> df1.count
res2: Long = 1 
scala> df1.count 
res3: Long = 1
{code}
 

in Spark 2.2, you could see it prints "". 

In the above example, when you do explain. You could see
{code:java}
scala> df1.explain(true)
== Parsed Logical Plan ==
'Project [value#2L, UDF('value) AS value1#5]
+- AnalysisBarrier
+- SerializeFromObject [input[0, bigint, false] AS value#2L]
+- ExternalRDD [obj#1L]

== Analyzed Logical Plan ==
value: bigint, value1: bigint
Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS value1#5L]
+- SerializeFromObject [input[0, bigint, false] AS value#2L]
+- ExternalRDD [obj#1L]

== Optimized Logical Plan ==
InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, memory, 
deserialized, 1 replicas)
+- *(1) Project [value#2L, UDF(value#2L) AS value1#5L]
+- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
+- Scan ExternalRDDScan[obj#1L]

== Physical Plan ==
*(1) InMemoryTableScan [value#2L, value1#5L]
+- InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, 
memory, deserialized, 1 replicas)
+- *(1) Project [value#2L, UDF(value#2L) AS value1#5L]
+- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
+- Scan ExternalRDDScan[obj#1L]

{code}
but the ImMemoryTableScan is mising in the following explain()
{code:java}
scala> df1.groupBy().count().explain(true)
== Parsed Logical Plan ==
Aggregate [count(1) AS count#170L]
+- Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS 
value1#5L]
+- SerializeFromObject [input[0, bigint, false] AS value#2L]
+- ExternalRDD [obj#1L]

== Analyzed Logical Plan ==
count: bigint
Aggregate [count(1) AS count#170L]
+- Project [value#2L, if (isnull(value#2L)) null else if (isnull(value#2L)) 
null else UDF(value#2L) AS value1#5L]
+- SerializeFromObject 

[jira] [Updated] (SPARK-24373) Spark Dataset groupby.agg/count doesn't respect cache with UDF

2018-05-23 Thread Wenbo Zhao (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-24373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenbo Zhao updated SPARK-24373:
---
Summary: Spark Dataset groupby.agg/count doesn't respect cache with UDF  
(was: Spark Dataset groupby.agg/count doesn't respect cache)

> Spark Dataset groupby.agg/count doesn't respect cache with UDF
> --
>
> Key: SPARK-24373
> URL: https://issues.apache.org/jira/browse/SPARK-24373
> Project: Spark
>  Issue Type: Bug
>  Components: Input/Output
>Affects Versions: 2.3.0
>Reporter: Wenbo Zhao
>Priority: Major
>
> Here is the code to reproduce in local mode
> {code:java}
> val df = sc.range(1, 2).toDF
> val myudf = udf({x: Long => println(""); x + 1})
> scala> df1.cache()
> res0: df1.type = [value: bigint, value1: bigint] 
> scala> df1.count
> res1: Long = 1
> scala> df1.count
> res2: Long = 1 
> scala> df1.count 
> res3: Long = 1
> {code}
>  
> in Spark 2.2, you could see it prints "". 
> In the above example, when you do explain. You could see
> {code:java}
> scala> df1.explain(true)
> == Parsed Logical Plan ==
> 'Project [value#2L, UDF('value) AS value1#5]
> +- AnalysisBarrier
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Analyzed Logical Plan ==
> value: bigint, value1: bigint
> Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS value1#5L]
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Optimized Logical Plan ==
> InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, 
> memory, deserialized, 1 replicas)
> +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L]
> +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- Scan ExternalRDDScan[obj#1L]
> == Physical Plan ==
> *(1) InMemoryTableScan [value#2L, value1#5L]
> +- InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, 
> memory, deserialized, 1 replicas)
> +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L]
> +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- Scan ExternalRDDScan[obj#1L]
> {code}
> but the ImMemoryTableScan is mising in the following explain()
> {code:java}
> scala> df1.groupBy().count().explain(true)
> == Parsed Logical Plan ==
> Aggregate [count(1) AS count#170L]
> +- Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS 
> value1#5L]
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Analyzed Logical Plan ==
> count: bigint
> Aggregate [count(1) AS count#170L]
> +- Project [value#2L, if (isnull(value#2L)) null else if (isnull(value#2L)) 
> null else UDF(value#2L) AS value1#5L]
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Optimized Logical Plan ==
> Aggregate [count(1) AS count#170L]
> +- Project
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Physical Plan ==
> *(2) HashAggregate(keys=[], functions=[count(1)], output=[count#170L])
> +- Exchange SinglePartition
> +- *(1) HashAggregate(keys=[], functions=[partial_count(1)], 
> output=[count#175L])
> +- *(1) Project
> +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- Scan ExternalRDDScan[obj#1L]
> {code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24373) Spark Dataset groupby.agg/count doesn't respect cache

2018-05-23 Thread Wenbo Zhao (JIRA)
Wenbo Zhao created SPARK-24373:
--

 Summary: Spark Dataset groupby.agg/count doesn't respect cache
 Key: SPARK-24373
 URL: https://issues.apache.org/jira/browse/SPARK-24373
 Project: Spark
  Issue Type: Bug
  Components: Input/Output
Affects Versions: 2.3.0
Reporter: Wenbo Zhao


Here is the code to reproduce in local mode
{code:java}
val df = sc.range(1, 2).toDF
val myudf = udf({x: Long => println(""); x + 1})
scala> df1.cache()
res0: df1.type = [value: bigint, value1: bigint] 
scala> df1.count
res1: Long = 1
scala> df1.count
res2: Long = 1 
scala> df1.count 
res3: Long = 1
{code}
 

in Spark 2.2, you could see it prints "". 

In the above example, when you do explain. You could see
{code:java}
scala> df1.explain(true)
== Parsed Logical Plan ==
'Project [value#2L, UDF('value) AS value1#5]
+- AnalysisBarrier
+- SerializeFromObject [input[0, bigint, false] AS value#2L]
+- ExternalRDD [obj#1L]

== Analyzed Logical Plan ==
value: bigint, value1: bigint
Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS value1#5L]
+- SerializeFromObject [input[0, bigint, false] AS value#2L]
+- ExternalRDD [obj#1L]

== Optimized Logical Plan ==
InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, memory, 
deserialized, 1 replicas)
+- *(1) Project [value#2L, UDF(value#2L) AS value1#5L]
+- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
+- Scan ExternalRDDScan[obj#1L]

== Physical Plan ==
*(1) InMemoryTableScan [value#2L, value1#5L]
+- InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, 
memory, deserialized, 1 replicas)
+- *(1) Project [value#2L, UDF(value#2L) AS value1#5L]
+- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
+- Scan ExternalRDDScan[obj#1L]

{code}
but the ImMemoryTableScan is mising in the following explain()
{code:java}
scala> df1.groupBy().count().explain(true)
== Parsed Logical Plan ==
Aggregate [count(1) AS count#170L]
+- Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS 
value1#5L]
+- SerializeFromObject [input[0, bigint, false] AS value#2L]
+- ExternalRDD [obj#1L]

== Analyzed Logical Plan ==
count: bigint
Aggregate [count(1) AS count#170L]
+- Project [value#2L, if (isnull(value#2L)) null else if (isnull(value#2L)) 
null else UDF(value#2L) AS value1#5L]
+- SerializeFromObject [input[0, bigint, false] AS value#2L]
+- ExternalRDD [obj#1L]

== Optimized Logical Plan ==
Aggregate [count(1) AS count#170L]
+- Project
+- SerializeFromObject [input[0, bigint, false] AS value#2L]
+- ExternalRDD [obj#1L]

== Physical Plan ==
*(2) HashAggregate(keys=[], functions=[count(1)], output=[count#170L])
+- Exchange SinglePartition
+- *(1) HashAggregate(keys=[], functions=[partial_count(1)], 
output=[count#175L])
+- *(1) Project
+- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
+- Scan ExternalRDDScan[obj#1L]
{code}
 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org