[ 
https://issues.apache.org/jira/browse/SPARK-7703?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chanh Le updated SPARK-7703:
----------------------------
    Comment: was deleted

(was: Any update on that? 
I have the same error too.
java.io.IOException: org.apache.spark.storage.BlockFetchException: Failed to 
fetch block from 1 locations. Most recent failure cause:
https://gist.github.com/giaosudau/3f7087707dcabc53c3b3bf54b0503720)

> Task failure caused by block fetch failure in BlockManager.doGetRemote() when 
> using TorrentBroadcast
> ----------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-7703
>                 URL: https://issues.apache.org/jira/browse/SPARK-7703
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 1.2.1, 1.3.1
>         Environment: Red Hat Enterprise Linux Server release 7.0 (Maipo)
> Spark 1.3.1 Release
>            Reporter: Hailong Wen
>
> I am from IBM Platform Symphony team and we are working to integration Spark 
> with our EGO to provide a fine-grained dynamic allocation Resource Manager. 
> We found a defect in current implementation of BlockManager.doGetRemote():
> {noformat}
>   private def doGetRemote(blockId: BlockId, asBlockResult: Boolean): 
> Option[Any] = {
>     require(blockId != null, "BlockId is null")
>     val locations = Random.shuffle(master.getLocations(blockId))     
> <--------------- Issue2: locations may be out of date
>     for (loc <- locations) {
>       logDebug(s"Getting remote block $blockId from $loc")
>       val data = blockTransferService.fetchBlockSync(
>         loc.host, loc.port, loc.executorId, blockId.toString).nioByteBuffer() 
>      <--------------- Issue1: This statement is not in try/catch
>       if (data != null) {
>         if (asBlockResult) {
>           return Some(new BlockResult(
>             dataDeserialize(blockId, data),
>             DataReadMethod.Network,
>             data.limit()))
>         } else {
>           return Some(data)
>         }
>       }
>       logDebug(s"The value of block $blockId is null")
>     }
>     logDebug(s"Block $blockId not found")
>     None
>   }
> {noformat}
> * Issue 1: Although the block fetch uses "for" to try all available 
> locations, the fetch method is not guarded by a "Try" block. When exception 
> occurs, this method will directly throw the error instead of trying other 
> block locations. The uncaught exception will cause task failure.
> * Issue 2: Constant "location" is acquired before fetching, however in a 
> dynamic allocation environment the block locations may change.
> We hit the above 2 issues in our use case, where Executors exit after all its 
> assigned tasks are done. We *occasionally* get the following error (issue 1.):
> {noformat}
> 15/05/13 10:28:35 INFO Executor: Running task 27.0 in stage 0.0 (TID 27)
> 15/05/13 10:28:35 DEBUG Executor: Task 26's epoch is 0
> 15/05/13 10:28:35 DEBUG Executor: Task 28's epoch is 0
> 15/05/13 10:28:35 DEBUG Executor: Task 27's epoch is 0
> 15/05/13 10:28:35 DEBUG BlockManager: Getting local block broadcast_0
> 15/05/13 10:28:35 DEBUG BlockManager: Block broadcast_0 not registered locally
> 15/05/13 10:28:35 INFO TorrentBroadcast: Started reading broadcast variable 0
> 15/05/13 10:28:35 DEBUG TorrentBroadcast: Reading piece broadcast_0_piece0 of 
> broadcast_0
> 15/05/13 10:28:35 DEBUG BlockManager: Getting local block broadcast_0_piece0 
> as bytes
> 15/05/13 10:28:35 DEBUG BlockManager: Block broadcast_0_piece0 not registered 
> locally
> 15/05/13 10:28:35 DEBUG BlockManager: Getting remote block broadcast_0_piece0 
> as bytes
> 15/05/13 10:28:35 DEBUG BlockManager: Getting remote block broadcast_0_piece0 
> from BlockManagerId(c390c311-bd97-4a99-bcb9-b32fd3dede17, sparkbj01, 37599)
> 15/05/13 10:28:35 TRACE NettyBlockTransferService: Fetch blocks from 
> sparkbj01:37599 (executor id c390c311-bd97-4a99-bcb9-b32fd3dede17)
> 15/05/13 10:28:35 DEBUG TransportClientFactory: Creating new connection to 
> sparkbj01/9.111.254.195:37599
> 15/05/13 10:28:35 ERROR RetryingBlockFetcher: Exception while beginning fetch 
> of 1 outstanding blocks 
> java.io.IOException: Failed to connect to sparkbj01/9.111.254.195:37599
>       at 
> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:191)
>       at 
> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
>       at 
> org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:78)
>       at 
> org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
>       at 
> org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120)
>       at 
> org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:87)
>       at 
> org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:89)
>       at 
> org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:599)
>       at 
> org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:597)
>       at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>       at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>       at 
> org.apache.spark.storage.BlockManager.doGetRemote(BlockManager.scala:597)
>       at 
> org.apache.spark.storage.BlockManager.getRemoteBytes(BlockManager.scala:591)
>       at 
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.org$apache$spark$broadcast$TorrentBroadcast$$anonfun$$getRemote$1(TorrentBroadcast.scala:126)
>       at 
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1$$anonfun$1.apply(TorrentBroadcast.scala:136)
>       at 
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1$$anonfun$1.apply(TorrentBroadcast.scala:136)
>       at scala.Option.orElse(Option.scala:257)
>       at 
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply$mcVI$sp(TorrentBroadcast.scala:136)
>       at 
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:119)
>       at 
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:119)
>       at scala.collection.immutable.List.foreach(List.scala:318)
>       at 
> org.apache.spark.broadcast.TorrentBroadcast.org$apache$spark$broadcast$TorrentBroadcast$$readBlocks(TorrentBroadcast.scala:119)
>       at 
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:174)
>       at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1149)
>       at 
> org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:164)
>       at 
> org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
>       at 
> org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
>       at 
> org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:87)
>       at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
>       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:58)
>       at org.apache.spark.scheduler.Task.run(Task.scala:56)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>       at java.lang.Thread.run(Thread.java:662)
> Caused by: java.net.ConnectException: Connection refused: 
> sparkbj01/9.111.254.195:37599
>       at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>       at 
> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:567)
>       at 
> io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:208)
>       at 
> io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:287)
>       at 
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
>       at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>       at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>       at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>       at 
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
>       ... 1 more
> 15/05/13 10:28:35 INFO RetryingBlockFetcher: Retrying fetch (1/3) for 1 
> outstanding blocks after 5000 ms
> 15/05/13 10:28:40 DEBUG TransportClientFactory: Creating new connection to 
> sparkbj01/9.111.254.195:37599
> 15/05/13 10:28:40 ERROR RetryingBlockFetcher: Exception while beginning fetch 
> of 1 outstanding blocks (after 1 retries)
> java.io.IOException: Failed to connect to sparkbj01/9.111.254.195:37599
>       at 
> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:191)
>       at 
> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
>       at 
> org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:78)
>       at 
> org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
>       at 
> org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43)
>       at 
> org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170)
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
>       at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
>       at java.util.concurrent.FutureTask.run(FutureTask.java:138)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>       at java.lang.Thread.run(Thread.java:662)
> Caused by: java.net.ConnectException: Connection refused: 
> sparkbj01/9.111.254.195:37599
>       at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>       at 
> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:567)
>       at 
> io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:208)
>       at 
> io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:287)
>       at 
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
>       at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>       at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>       at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>       at 
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
>       ... 1 more
> {noformat}
> We did send "ExecutorLost" messages so that BlockManagerMaster can remove the 
> executor from its block location map. But due to network latency the 
> "getLocation" call may happen before the removal. 
> In our heavy workload environment, some tasks may keep fail and finally 
> causes *job failure*.
> Using HttpBroadcast instead of default TorrentBroadcast did help to resolve 
> this problem but we want better performance. So we added a Try block but 
> found that the "for" loop will try dozens of dead executor before finally 
> fetched the block from driver's BlockManager. This process takes *several 
> minutes*.
> We are now working around this problem by the following fix:
> {noformat}
>   private def doGetRemote(blockId: BlockId, asBlockResult: Boolean): 
> Option[Any] = {
>     require(blockId != null, "BlockId is null")
>     var blockFetched = false
>     while (!blockFetched) {
>       val locations = Random.shuffle(master.getLocations(blockId))
>       val loc = locations.head
>       logDebug(s"Getting remote block $blockId from $loc")
>       val dataTry = Try(blockTransferService.fetchBlockSync(
>         loc.host, loc.port, loc.executorId, blockId.toString).nioByteBuffer())
>       
>       dataTry match {
>         case Success(data) =>
>           if (data != null) {
>             if (asBlockResult) {
>               return Some(new BlockResult(
>                 dataDeserialize(blockId, data),
>                 DataReadMethod.Network,
>                 data.limit()))
>             } else {
>               return Some(data)
>             }
>           }
>           logDebug(s"The value of block $blockId is null")
>         case Failure(e) =>
>           logWarning(s"Failed to fetch block ${blockId.toString} from 
> ${loc.host}:"
>                      + s"${loc.port} executorId:${loc.executorId}. "
>                      + {
>                        if (locations.size <= 1) "" else "Will update 
> locations and retry."
>                      })
>       }
>       // If we have no more than 1 location to get from (the driver), we may 
> stop retrying and just exit.
>       blockFetched = (locations.size <= 1)
>     }
>     logDebug(s"Block $blockId not found")
>     None
>   }
> {noformat}
> This fix suppress the Exception when fetch fails, and update the location to 
> reduce future failures.
> We are expecting to get help from experts in the community to have a more 
> thorough solution (e.g., can we try all available block locations in a random 
> rolling manner, instead of re-trying the same location 4 times consecutively?)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to