[ https://issues.apache.org/jira/browse/SPARK-29114?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Saisai Shao updated SPARK-29114: -------------------------------- Priority: Major (was: Blocker) > Dataset<Row>.coalesce(10) throw ChunkFetchFailureException when original > Dataset size is big > -------------------------------------------------------------------------------------------- > > Key: SPARK-29114 > URL: https://issues.apache.org/jira/browse/SPARK-29114 > Project: Spark > Issue Type: Bug > Components: Block Manager > Affects Versions: 2.3.0 > Reporter: ZhanxiongWang > Priority: Major > > I create a Dataset<Row> df with 200 partitions. I applied for 100 executors > for my task. Each executor with 1 core, and driver memory is 8G executor is > 16G. I use df.cache() before df.coalesce(10). When{color:#de350b} > Dataset<Row>{color} {color:#de350b}size is small{color}, the program works > well. But when I {color:#de350b}increase{color} the size of the Dataset<Row>, > the function {color:#de350b}df.coalesce(10){color} will throw > ChunkFetchFailureException. > 19/09/17 08:26:44 INFO CoarseGrainedExecutorBackend: Got assigned task 210 > 19/09/17 08:26:44 INFO Executor: Running task 0.0 in stage 3.0 (TID 210) > 19/09/17 08:26:44 INFO MapOutputTrackerWorker: Updating epoch to 1 and > clearing cache > 19/09/17 08:26:44 INFO TorrentBroadcast: Started reading broadcast variable > 1003 > 19/09/17 08:26:44 INFO MemoryStore: Block broadcast_1003_piece0 stored as > bytes in memory (estimated size 49.4 KB, free 3.8 GB) > 19/09/17 08:26:44 INFO TorrentBroadcast: Reading broadcast variable 1003 took > 7 ms > 19/09/17 08:26:44 INFO MemoryStore: Block broadcast_1003 stored as values in > memory (estimated size 154.5 KB, free 3.8 GB) > 19/09/17 08:26:44 INFO BlockManager: Found block rdd_1005_0 locally > 19/09/17 08:26:44 INFO BlockManager: Found block rdd_1005_1 locally > 19/09/17 08:26:44 INFO TransportClientFactory: Successfully created > connection to /100.76.29.130:54238 after 1 ms (0 ms spent in bootstraps) > 19/09/17 08:26:46 ERROR RetryingBlockFetcher: Failed to fetch block > rdd_1005_18, and will not retry (0 retries) > org.apache.spark.network.client.ChunkFetchFailureException: Failure while > fetching StreamChunkId\{streamId=69368607002, chunkIndex=0}: readerIndex: 0, > writerIndex: -2137154997 (expected: 0 <= readerIndex <= writerIndex <= > capacity(-2137154997)) > at > org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:182) > at > org.apache.spark.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:120) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:292) > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:278) > at > io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:292) > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:278) > at > io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:292) > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:278) > at > org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:85) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:292) > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:278) > at > io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:962) > at > io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131) > at > io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528) > at > io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:485) > at > io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:399) > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:371) > at > io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112) > at > io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137) > at java.lang.Thread.run(Thread.java:745) > 19/09/17 08:26:46 WARN BlockManager: Failed to fetch block after 1 fetch > failures. Most recent failure cause: > org.apache.spark.SparkException: Exception thrown in awaitResult: > at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205) > at > org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:115) > at > org.apache.spark.storage.BlockManager.getRemoteBytes(BlockManager.scala:691) > at > org.apache.spark.storage.BlockManager.getRemoteValues(BlockManager.scala:634) > at org.apache.spark.storage.BlockManager.get(BlockManager.scala:747) > at > org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:802) > at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:286) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) > at > org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:100) > at > org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:99) > at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) > at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithoutKey_0$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at > org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125) > at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) > at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) > at org.apache.spark.scheduler.Task.run(Task.scala:109) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: org.apache.spark.network.client.ChunkFetchFailureException: > Failure while fetching StreamChunkId\{streamId=69368607002, chunkIndex=0}: > readerIndex: 0, writerIndex: -2137154997 (expected: 0 <= readerIndex <= > writerIndex <= capacity(-2137154997)) > at > org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:182) > at > org.apache.spark.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:120) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:292) > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:278) > at > io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:292) > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:278) > at > io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:292) > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:278) > at > org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:85) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:292) > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:278) > at > io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:962) > at > io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131) > at > io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528) > at > io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:485) > at > io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:399) > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:371) > at > io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112) > at > io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137) > ... 1 more > 19/09/17 08:26:46 INFO NewHadoopRDD: Input split: > 9.46.2.233:0935a68ad8000000,0935d7fb180999FF > 19/09/17 08:26:46 INFO TorrentBroadcast: Started reading broadcast variable 93 > 19/09/17 08:26:46 INFO MemoryStore: Block broadcast_93_piece0 stored as bytes > in memory (estimated size 32.5 KB, free 3.8 GB) > 19/09/17 08:26:46 INFO TorrentBroadcast: Reading broadcast variable 93 took 8 > ms > 19/09/17 08:26:47 INFO MemoryStore: Block broadcast_93 stored as values in > memory (estimated size 372.0 KB, free 3.8 GB) > 19/09/17 08:26:47 INFO RecoverableZooKeeper: Process > identifier=hconnection-0x1aa852f0 connecting to ZooKeeper > ensemble=ss-hbase-zk-4:2181,ss-hbase-zk-3:2181,ss-hbase-zk-2:2181,ss-hbase-zk-1:2181,ss-hbase-zk-5:2181 > 19/09/17 08:26:47 INFO ZooKeeper: Initiating client connection, > connectString=ss-hbase-zk-4:2181,ss-hbase-zk-3:2181,ss-hbase-zk-2:2181,ss-hbase-zk-1:2181,ss-hbase-zk-5:2181 > sessionTimeout=90000 watcher=hconnection-0x1aa852f0, > quorum=ss-hbase-zk-4:2181,ss-hbase-zk-3:2181,ss-hbase-zk-2:2181,ss-hbase-zk-1:2181,ss-hbase-zk-5:2181, > baseZNode=/hbase-qq-mp-ss-slave > 19/09/17 08:26:47 INFO ClientCnxn: Opening socket connection to server > 10.254.82.84/10.254.82.84:2181. Will not attempt to authenticate using SASL > (unknown error) > 19/09/17 08:26:47 INFO ClientCnxn: Socket connection established to > 10.254.82.84/10.254.82.84:2181, initiating session > 19/09/17 08:26:47 INFO ClientCnxn: Session establishment complete on server > 10.254.82.84/10.254.82.84:2181, sessionid = 0x36c7f371e67307f, negotiated > timeout = 90000 > 19/09/17 08:26:47 INFO TableInputFormatBase: Input split length: 19.8 G bytes. > 19/09/17 08:41:37 INFO HConnectionManager$HConnectionImplementation: Closing > zookeeper sessionid=0x36c7f371e67307f > 19/09/17 08:41:38 INFO ZooKeeper: Session: 0x36c7f371e67307f closed > 19/09/17 08:41:38 INFO ClientCnxn: EventThread shut down > 19/09/17 08:41:38 INFO MemoryStore: Block rdd_1005_18 stored as values in > memory (estimated size 1822.9 MB, free 2025.1 MB) > 19/09/17 08:41:38 INFO TransportClientFactory: Successfully created > connection to /9.10.29.145:37002 after 0 ms (0 ms spent in bootstraps) > 19/09/17 08:41:39 ERROR RetryingBlockFetcher: Failed to fetch block > rdd_1005_32, and will not retry (0 retries) > org.apache.spark.network.client.ChunkFetchFailureException: Failure while > fetching StreamChunkId\{streamId=1800856515000, chunkIndex=0}: readerIndex: > 0, writerIndex: -2138342822 (expected: 0 <= readerIndex <= writerIndex <= > capacity(-2138342822)) > > Let me explain more experimental details. > {color:#de350b}When df size is small:{color} > * > * *Storage Level:* Memory Deserialized 1x Replicated > * *Cached Partitions:* 200 > * *Total Partitions:* 200 > * *Memory Size:* 356.2 GB > * *Disk Size:* 0.0 B > h4. Data Distribution on 100 Executors > {color:#de350b}*The Storage situation for the top 10* *cached the most > memory* *executor:* {color} > on heap memory usage : 5.3 GB (3.8 GB Remaining) > off heap memory usage : 0.0 B (0.0 B Remaining) > disk usage : 0.0 B > {color:#de350b}*The Storage situation for the rest executors :*{color} > on heap memory usage : 3.6 GB (5.6 GB Remaining) > off heap memory usage : 0.0 B (0.0 B Remaining) > disk usage : 0.0 B > > The log of the successful task is as follows: > > 19/09/17 09:33:17 INFO CoarseGrainedExecutorBackend: Got assigned task 211 > 19/09/17 09:33:17 INFO Executor: Running task 3.0 in stage 3.0 (TID 211) > 19/09/17 09:33:17 INFO MapOutputTrackerWorker: Updating epoch to 1 and > clearing cache > 19/09/17 09:33:17 INFO TorrentBroadcast: Started reading broadcast variable > 1003 > 19/09/17 09:33:17 INFO TransportClientFactory: Successfully created > connection to /9.10.19.210:51072 after 8 ms (0 ms spent in bootstraps) > 19/09/17 09:33:17 INFO MemoryStore: Block broadcast_1003_piece0 stored as > bytes in memory (estimated size 49.4 KB, free 5.5 GB) > 19/09/17 09:33:17 INFO TorrentBroadcast: Reading broadcast variable 1003 took > 36 ms > 19/09/17 09:33:18 INFO MemoryStore: Block broadcast_1003 stored as values in > memory (estimated size 154.5 KB, free 5.5 GB) > 19/09/17 09:33:18 INFO BlockManager: Found block rdd_1005_6 locally > 19/09/17 09:33:18 INFO TransportClientFactory: Successfully created > connection to /100.76.35.94:37220 after 1 ms (0 ms spent in bootstraps) > 19/09/17 09:33:36 INFO BlockManager: Found block rdd_1005_33 remotely > 19/09/17 09:33:37 INFO TransportClientFactory: Successfully created > connection to /100.76.35.94:32935 after 10 ms (0 ms spent in bootstraps) > 19/09/17 09:33:50 INFO BlockManager: Found block rdd_1005_40 remotely > 19/09/17 09:33:52 INFO TransportClientFactory: Successfully created > connection to /100.76.25.87:45875 after 2 ms (0 ms spent in bootstraps) > 19/09/17 09:34:06 INFO BlockManager: Found block rdd_1005_46 remotely > 19/09/17 09:34:08 INFO TransportClientFactory: Successfully created > connection to /9.10.36.96:35134 after 32 ms (0 ms spent in bootstraps) > 19/09/17 09:34:18 INFO BlockManager: Found block rdd_1005_48 remotely > 19/09/17 09:34:20 INFO TransportClientFactory: Successfully created > connection to /9.47.25.185:47504 after 1 ms (0 ms spent in bootstraps) > 19/09/17 09:34:42 INFO BlockManager: Found block rdd_1005_49 remotely > 19/09/17 09:34:44 INFO TransportClientFactory: Successfully created > connection to /100.76.33.91:35365 after 1 ms (0 ms spent in bootstraps) > 19/09/17 09:34:59 INFO BlockManager: Found block rdd_1005_51 remotely > 19/09/17 09:35:01 INFO TransportClientFactory: Successfully created > connection to /9.10.7.26:49383 after 3 ms (0 ms spent in bootstraps) > 19/09/17 09:35:16 INFO BlockManager: Found block rdd_1005_71 remotely > 19/09/17 09:35:18 INFO TransportClientFactory: Successfully created > connection to /100.76.72.246:51684 after 2 ms (0 ms spent in bootstraps) > 19/09/17 09:35:28 INFO BlockManager: Found block rdd_1005_75 remotely > 19/09/17 09:35:30 INFO TransportClientFactory: Successfully created > connection to /9.47.30.46:51291 after 1 ms (0 ms spent in bootstraps) > 19/09/17 09:35:45 INFO BlockManager: Found block rdd_1005_98 remotely > 19/09/17 09:35:47 INFO TransportClientFactory: Successfully created > connection to /9.10.137.17:56554 after 2 ms (0 ms spent in bootstraps) > 19/09/17 09:36:00 INFO BlockManager: Found block rdd_1005_116 remotely > 19/09/17 09:36:02 INFO TransportClientFactory: Successfully created > connection to /100.76.35.94:58951 after 2 ms (0 ms spent in bootstraps) > 19/09/17 09:36:16 INFO BlockManager: Found block rdd_1005_121 remotely > 19/09/17 09:36:19 INFO TransportClientFactory: Successfully created > connection to /9.10.36.96:50992 after 1 ms (0 ms spent in bootstraps) > 19/09/17 09:36:27 INFO BlockManager: Found block rdd_1005_128 remotely > 19/09/17 09:36:39 INFO BlockManager: Found block rdd_1005_134 remotely > 19/09/17 09:36:42 INFO TransportClientFactory: Successfully created > connection to /9.10.7.92:41607 after 73 ms (0 ms spent in bootstraps) > 19/09/17 09:36:54 INFO BlockManager: Found block rdd_1005_153 remotely > 19/09/17 09:37:06 INFO BlockManager: Found block rdd_1005_167 remotely > 19/09/17 09:37:08 INFO BlockManager: Found block rdd_1005_174 locally > 19/09/17 09:37:08 INFO TransportClientFactory: Successfully created > connection to /9.10.29.150:43709 after 10 ms (0 ms spent in bootstraps) > 19/09/17 09:37:20 INFO BlockManager: Found block rdd_1005_182 remotely > 19/09/17 09:37:22 INFO TransportClientFactory: Successfully created > connection to /9.10.8.84:55958 after 14 ms (0 ms spent in bootstraps) > 19/09/17 09:37:32 INFO BlockManager: Found block rdd_1005_189 remotely > 19/09/17 09:37:34 INFO Executor: Finished task 3.0 in stage 3.0 (TID 211). > 1752 bytes result sent to driver > > When I{color:#de350b} increase the size of the Dataset<Row>{color}: > * *Storage Level:* Disk Serialized 1x Replicated > * *Cached Partitions:* 200 > * *Total Partitions:* 200 > * *Memory Size:* 390.8 GB > * *Disk Size:* 166.8 GB > h4. Data Distribution on 100 Executors > {color:#de350b}*The Storage situation for the top 10* *cached the most > memory* *executor:* {color} > on heap memory usage : 7.2 GB (2008.3 MB Remaining) > off heap memory usage : 0.0 B (0.0 B Remaining) > disk usage : 18.1 GB > {color:#de350b}*The Storage situation for the rest executors :*{color} > on heap memory usage : 3.6 GB (5.6 GB Remaining) > off heap memory usage : 0.0 B (0.0 B Remaining) > disk usage : 0.0 B > In this situation, 10 executors used disk usage because df.coalesce(10) throw > ChunkFetchFailureException, 10 executors just fetch data from original > datasource again, and cached in new Dataset<Row>. -- This message was sent by Atlassian Jira (v8.3.2#803003) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org