Thx. But this method is in BlockTransferService.scala of spark which i can not replace unless i rewrite the core code. I wonder if it is handled somewhere already.
2015-03-16 11:27 GMT+08:00 Chester Chen <ches...@alpinenow.com>: > can you just replace "Duration.Inf" with a shorter duration ? how about > > import scala.concurrent.duration._ > val timeout = new Timeout(10 seconds) > Await.result(result.future, timeout.duration) > > or > > val timeout = new FiniteDuration(10, TimeUnit.SECONDS) > Await.result(result.future, timeout) > > or simply > import scala.concurrent.duration._ > Await.result(result.future, 10 seconds) > > > > On Sun, Mar 15, 2015 at 8:08 PM, lonely Feb <lonely8...@gmail.com> wrote: > >> Hi all, i meet up with a problem that torrent broadcast hang out in my >> spark cluster (1.2, standalone) , particularly serious when driver and >> executors are cross-region. when i read the code of broadcast i found that >> a sync block read here: >> >> def fetchBlockSync(host: String, port: Int, execId: String, blockId: >> String): ManagedBuffer = { >> // A monitor for the thread to wait on. >> val result = Promise[ManagedBuffer]() >> fetchBlocks(host, port, execId, Array(blockId), >> new BlockFetchingListener { >> override def onBlockFetchFailure(blockId: String, exception: >> Throwable): Unit = { >> result.failure(exception) >> } >> override def onBlockFetchSuccess(blockId: String, data: >> ManagedBuffer): Unit = { >> val ret = ByteBuffer.allocate(data.size.toInt) >> ret.put(data.nioByteBuffer()) >> ret.flip() >> result.success(new NioManagedBuffer(ret)) >> } >> }) >> >> Await.result(result.future, Duration.Inf) >> } >> >> it seems that fetchBlockSync method does not have a timeout limit but wait >> forever ? Anybody can show me how to control the timeout here? >> > >