[ https://issues.apache.org/jira/browse/SPARK-7703?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Chanh Le updated SPARK-7703: ---------------------------- Comment: was deleted (was: Any update on that? I have the same error too. java.io.IOException: org.apache.spark.storage.BlockFetchException: Failed to fetch block from 1 locations. Most recent failure cause: https://gist.github.com/giaosudau/3f7087707dcabc53c3b3bf54b0503720) > Task failure caused by block fetch failure in BlockManager.doGetRemote() when > using TorrentBroadcast > ---------------------------------------------------------------------------------------------------- > > Key: SPARK-7703 > URL: https://issues.apache.org/jira/browse/SPARK-7703 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 1.2.1, 1.3.1 > Environment: Red Hat Enterprise Linux Server release 7.0 (Maipo) > Spark 1.3.1 Release > Reporter: Hailong Wen > > I am from IBM Platform Symphony team and we are working to integration Spark > with our EGO to provide a fine-grained dynamic allocation Resource Manager. > We found a defect in current implementation of BlockManager.doGetRemote(): > {noformat} > private def doGetRemote(blockId: BlockId, asBlockResult: Boolean): > Option[Any] = { > require(blockId != null, "BlockId is null") > val locations = Random.shuffle(master.getLocations(blockId)) > <--------------- Issue2: locations may be out of date > for (loc <- locations) { > logDebug(s"Getting remote block $blockId from $loc") > val data = blockTransferService.fetchBlockSync( > loc.host, loc.port, loc.executorId, blockId.toString).nioByteBuffer() > <--------------- Issue1: This statement is not in try/catch > if (data != null) { > if (asBlockResult) { > return Some(new BlockResult( > dataDeserialize(blockId, data), > DataReadMethod.Network, > data.limit())) > } else { > return Some(data) > } > } > logDebug(s"The value of block $blockId is null") > } > logDebug(s"Block $blockId not found") > None > } > {noformat} > * Issue 1: Although the block fetch uses "for" to try all available > locations, the fetch method is not guarded by a "Try" block. When exception > occurs, this method will directly throw the error instead of trying other > block locations. The uncaught exception will cause task failure. > * Issue 2: Constant "location" is acquired before fetching, however in a > dynamic allocation environment the block locations may change. > We hit the above 2 issues in our use case, where Executors exit after all its > assigned tasks are done. We *occasionally* get the following error (issue 1.): > {noformat} > 15/05/13 10:28:35 INFO Executor: Running task 27.0 in stage 0.0 (TID 27) > 15/05/13 10:28:35 DEBUG Executor: Task 26's epoch is 0 > 15/05/13 10:28:35 DEBUG Executor: Task 28's epoch is 0 > 15/05/13 10:28:35 DEBUG Executor: Task 27's epoch is 0 > 15/05/13 10:28:35 DEBUG BlockManager: Getting local block broadcast_0 > 15/05/13 10:28:35 DEBUG BlockManager: Block broadcast_0 not registered locally > 15/05/13 10:28:35 INFO TorrentBroadcast: Started reading broadcast variable 0 > 15/05/13 10:28:35 DEBUG TorrentBroadcast: Reading piece broadcast_0_piece0 of > broadcast_0 > 15/05/13 10:28:35 DEBUG BlockManager: Getting local block broadcast_0_piece0 > as bytes > 15/05/13 10:28:35 DEBUG BlockManager: Block broadcast_0_piece0 not registered > locally > 15/05/13 10:28:35 DEBUG BlockManager: Getting remote block broadcast_0_piece0 > as bytes > 15/05/13 10:28:35 DEBUG BlockManager: Getting remote block broadcast_0_piece0 > from BlockManagerId(c390c311-bd97-4a99-bcb9-b32fd3dede17, sparkbj01, 37599) > 15/05/13 10:28:35 TRACE NettyBlockTransferService: Fetch blocks from > sparkbj01:37599 (executor id c390c311-bd97-4a99-bcb9-b32fd3dede17) > 15/05/13 10:28:35 DEBUG TransportClientFactory: Creating new connection to > sparkbj01/9.111.254.195:37599 > 15/05/13 10:28:35 ERROR RetryingBlockFetcher: Exception while beginning fetch > of 1 outstanding blocks > java.io.IOException: Failed to connect to sparkbj01/9.111.254.195:37599 > at > org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:191) > at > org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156) > at > org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:78) > at > org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140) > at > org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120) > at > org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:87) > at > org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:89) > at > org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:599) > at > org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:597) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > org.apache.spark.storage.BlockManager.doGetRemote(BlockManager.scala:597) > at > org.apache.spark.storage.BlockManager.getRemoteBytes(BlockManager.scala:591) > at > org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.org$apache$spark$broadcast$TorrentBroadcast$$anonfun$$getRemote$1(TorrentBroadcast.scala:126) > at > org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1$$anonfun$1.apply(TorrentBroadcast.scala:136) > at > org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1$$anonfun$1.apply(TorrentBroadcast.scala:136) > at scala.Option.orElse(Option.scala:257) > at > org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply$mcVI$sp(TorrentBroadcast.scala:136) > at > org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:119) > at > org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:119) > at scala.collection.immutable.List.foreach(List.scala:318) > at > org.apache.spark.broadcast.TorrentBroadcast.org$apache$spark$broadcast$TorrentBroadcast$$readBlocks(TorrentBroadcast.scala:119) > at > org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:174) > at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1149) > at > org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:164) > at > org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64) > at > org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64) > at > org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:87) > at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:58) > at org.apache.spark.scheduler.Task.run(Task.scala:56) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200) > at > java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) > at java.lang.Thread.run(Thread.java:662) > Caused by: java.net.ConnectException: Connection refused: > sparkbj01/9.111.254.195:37599 > at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) > at > sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:567) > at > io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:208) > at > io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:287) > at > io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528) > at > io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) > at > io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) > at > io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116) > ... 1 more > 15/05/13 10:28:35 INFO RetryingBlockFetcher: Retrying fetch (1/3) for 1 > outstanding blocks after 5000 ms > 15/05/13 10:28:40 DEBUG TransportClientFactory: Creating new connection to > sparkbj01/9.111.254.195:37599 > 15/05/13 10:28:40 ERROR RetryingBlockFetcher: Exception while beginning fetch > of 1 outstanding blocks (after 1 retries) > java.io.IOException: Failed to connect to sparkbj01/9.111.254.195:37599 > at > org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:191) > at > org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156) > at > org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:78) > at > org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140) > at > org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43) > at > org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441) > at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) > at java.util.concurrent.FutureTask.run(FutureTask.java:138) > at > java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) > at java.lang.Thread.run(Thread.java:662) > Caused by: java.net.ConnectException: Connection refused: > sparkbj01/9.111.254.195:37599 > at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) > at > sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:567) > at > io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:208) > at > io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:287) > at > io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528) > at > io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) > at > io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) > at > io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116) > ... 1 more > {noformat} > We did send "ExecutorLost" messages so that BlockManagerMaster can remove the > executor from its block location map. But due to network latency the > "getLocation" call may happen before the removal. > In our heavy workload environment, some tasks may keep fail and finally > causes *job failure*. > Using HttpBroadcast instead of default TorrentBroadcast did help to resolve > this problem but we want better performance. So we added a Try block but > found that the "for" loop will try dozens of dead executor before finally > fetched the block from driver's BlockManager. This process takes *several > minutes*. > We are now working around this problem by the following fix: > {noformat} > private def doGetRemote(blockId: BlockId, asBlockResult: Boolean): > Option[Any] = { > require(blockId != null, "BlockId is null") > var blockFetched = false > while (!blockFetched) { > val locations = Random.shuffle(master.getLocations(blockId)) > val loc = locations.head > logDebug(s"Getting remote block $blockId from $loc") > val dataTry = Try(blockTransferService.fetchBlockSync( > loc.host, loc.port, loc.executorId, blockId.toString).nioByteBuffer()) > > dataTry match { > case Success(data) => > if (data != null) { > if (asBlockResult) { > return Some(new BlockResult( > dataDeserialize(blockId, data), > DataReadMethod.Network, > data.limit())) > } else { > return Some(data) > } > } > logDebug(s"The value of block $blockId is null") > case Failure(e) => > logWarning(s"Failed to fetch block ${blockId.toString} from > ${loc.host}:" > + s"${loc.port} executorId:${loc.executorId}. " > + { > if (locations.size <= 1) "" else "Will update > locations and retry." > }) > } > // If we have no more than 1 location to get from (the driver), we may > stop retrying and just exit. > blockFetched = (locations.size <= 1) > } > logDebug(s"Block $blockId not found") > None > } > {noformat} > This fix suppress the Exception when fetch fails, and update the location to > reduce future failures. > We are expecting to get help from experts in the community to have a more > thorough solution (e.g., can we try all available block locations in a random > rolling manner, instead of re-trying the same location 4 times consecutively?) -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org