Thx. But this method is in BlockTransferService.scala of spark which i can
not replace unless i rewrite the core code. I wonder if it is handled
somewhere already.

2015-03-16 11:27 GMT+08:00 Chester Chen <ches...@alpinenow.com>:

> can you just replace "Duration.Inf" with a shorter duration  ? how about
>
>       import scala.concurrent.duration._
>       val timeout = new Timeout(10 seconds)
>       Await.result(result.future, timeout.duration)
>
>       or
>
>       val timeout = new FiniteDuration(10, TimeUnit.SECONDS)
>       Await.result(result.future, timeout)
>
>       or simply
>       import scala.concurrent.duration._
>       Await.result(result.future, 10 seconds)
>
>
>
> On Sun, Mar 15, 2015 at 8:08 PM, lonely Feb <lonely8...@gmail.com> wrote:
>
>> Hi all, i meet up with a problem that torrent broadcast hang out in my
>> spark cluster (1.2, standalone) , particularly serious when driver and
>> executors are cross-region. when i read the code of broadcast i found that
>> a sync block read here:
>>
>>   def fetchBlockSync(host: String, port: Int, execId: String, blockId:
>> String): ManagedBuffer = {
>>     // A monitor for the thread to wait on.
>>     val result = Promise[ManagedBuffer]()
>>     fetchBlocks(host, port, execId, Array(blockId),
>>       new BlockFetchingListener {
>>         override def onBlockFetchFailure(blockId: String, exception:
>> Throwable): Unit = {
>>           result.failure(exception)
>>         }
>>         override def onBlockFetchSuccess(blockId: String, data:
>> ManagedBuffer): Unit = {
>>           val ret = ByteBuffer.allocate(data.size.toInt)
>>           ret.put(data.nioByteBuffer())
>>           ret.flip()
>>           result.success(new NioManagedBuffer(ret))
>>         }
>>       })
>>
>>     Await.result(result.future, Duration.Inf)
>>   }
>>
>> it seems that fetchBlockSync method does not have a timeout limit but wait
>> forever ? Anybody can show me how to control the timeout here?
>>
>
>

Reply via email to