Re: broadcast hang out
Cross region as in different data centers ? - Mridul On Sun, Mar 15, 2015 at 8:08 PM, lonely Feb lonely8...@gmail.com wrote: Hi all, i meet up with a problem that torrent broadcast hang out in my spark cluster (1.2, standalone) , particularly serious when driver and executors are cross-region. when i read the code of broadcast i found that a sync block read here: def fetchBlockSync(host: String, port: Int, execId: String, blockId: String): ManagedBuffer = { // A monitor for the thread to wait on. val result = Promise[ManagedBuffer]() fetchBlocks(host, port, execId, Array(blockId), new BlockFetchingListener { override def onBlockFetchFailure(blockId: String, exception: Throwable): Unit = { result.failure(exception) } override def onBlockFetchSuccess(blockId: String, data: ManagedBuffer): Unit = { val ret = ByteBuffer.allocate(data.size.toInt) ret.put(data.nioByteBuffer()) ret.flip() result.success(new NioManagedBuffer(ret)) } }) Await.result(result.future, Duration.Inf) } it seems that fetchBlockSync method does not have a timeout limit but wait forever ? Anybody can show me how to control the timeout here? - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: broadcast hang out
yes 2015-03-16 11:43 GMT+08:00 Mridul Muralidharan mri...@gmail.com: Cross region as in different data centers ? - Mridul On Sun, Mar 15, 2015 at 8:08 PM, lonely Feb lonely8...@gmail.com wrote: Hi all, i meet up with a problem that torrent broadcast hang out in my spark cluster (1.2, standalone) , particularly serious when driver and executors are cross-region. when i read the code of broadcast i found that a sync block read here: def fetchBlockSync(host: String, port: Int, execId: String, blockId: String): ManagedBuffer = { // A monitor for the thread to wait on. val result = Promise[ManagedBuffer]() fetchBlocks(host, port, execId, Array(blockId), new BlockFetchingListener { override def onBlockFetchFailure(blockId: String, exception: Throwable): Unit = { result.failure(exception) } override def onBlockFetchSuccess(blockId: String, data: ManagedBuffer): Unit = { val ret = ByteBuffer.allocate(data.size.toInt) ret.put(data.nioByteBuffer()) ret.flip() result.success(new NioManagedBuffer(ret)) } }) Await.result(result.future, Duration.Inf) } it seems that fetchBlockSync method does not have a timeout limit but wait forever ? Anybody can show me how to control the timeout here?
Re: broadcast hang out
Thx. But this method is in BlockTransferService.scala of spark which i can not replace unless i rewrite the core code. I wonder if it is handled somewhere already. 2015-03-16 11:27 GMT+08:00 Chester Chen ches...@alpinenow.com: can you just replace Duration.Inf with a shorter duration ? how about import scala.concurrent.duration._ val timeout = new Timeout(10 seconds) Await.result(result.future, timeout.duration) or val timeout = new FiniteDuration(10, TimeUnit.SECONDS) Await.result(result.future, timeout) or simply import scala.concurrent.duration._ Await.result(result.future, 10 seconds) On Sun, Mar 15, 2015 at 8:08 PM, lonely Feb lonely8...@gmail.com wrote: Hi all, i meet up with a problem that torrent broadcast hang out in my spark cluster (1.2, standalone) , particularly serious when driver and executors are cross-region. when i read the code of broadcast i found that a sync block read here: def fetchBlockSync(host: String, port: Int, execId: String, blockId: String): ManagedBuffer = { // A monitor for the thread to wait on. val result = Promise[ManagedBuffer]() fetchBlocks(host, port, execId, Array(blockId), new BlockFetchingListener { override def onBlockFetchFailure(blockId: String, exception: Throwable): Unit = { result.failure(exception) } override def onBlockFetchSuccess(blockId: String, data: ManagedBuffer): Unit = { val ret = ByteBuffer.allocate(data.size.toInt) ret.put(data.nioByteBuffer()) ret.flip() result.success(new NioManagedBuffer(ret)) } }) Await.result(result.future, Duration.Inf) } it seems that fetchBlockSync method does not have a timeout limit but wait forever ? Anybody can show me how to control the timeout here?
broadcast hang out
Hi all, i meet up with a problem that torrent broadcast hang out in my spark cluster (1.2, standalone) , particularly serious when driver and executors are cross-region. when i read the code of broadcast i found that a sync block read here: def fetchBlockSync(host: String, port: Int, execId: String, blockId: String): ManagedBuffer = { // A monitor for the thread to wait on. val result = Promise[ManagedBuffer]() fetchBlocks(host, port, execId, Array(blockId), new BlockFetchingListener { override def onBlockFetchFailure(blockId: String, exception: Throwable): Unit = { result.failure(exception) } override def onBlockFetchSuccess(blockId: String, data: ManagedBuffer): Unit = { val ret = ByteBuffer.allocate(data.size.toInt) ret.put(data.nioByteBuffer()) ret.flip() result.success(new NioManagedBuffer(ret)) } }) Await.result(result.future, Duration.Inf) } it seems that fetchBlockSync method does not have a timeout limit but wait forever ? Anybody can show me how to control the timeout here?
Re: broadcast hang out
can you just replace Duration.Inf with a shorter duration ? how about import scala.concurrent.duration._ val timeout = new Timeout(10 seconds) Await.result(result.future, timeout.duration) or val timeout = new FiniteDuration(10, TimeUnit.SECONDS) Await.result(result.future, timeout) or simply import scala.concurrent.duration._ Await.result(result.future, 10 seconds) On Sun, Mar 15, 2015 at 8:08 PM, lonely Feb lonely8...@gmail.com wrote: Hi all, i meet up with a problem that torrent broadcast hang out in my spark cluster (1.2, standalone) , particularly serious when driver and executors are cross-region. when i read the code of broadcast i found that a sync block read here: def fetchBlockSync(host: String, port: Int, execId: String, blockId: String): ManagedBuffer = { // A monitor for the thread to wait on. val result = Promise[ManagedBuffer]() fetchBlocks(host, port, execId, Array(blockId), new BlockFetchingListener { override def onBlockFetchFailure(blockId: String, exception: Throwable): Unit = { result.failure(exception) } override def onBlockFetchSuccess(blockId: String, data: ManagedBuffer): Unit = { val ret = ByteBuffer.allocate(data.size.toInt) ret.put(data.nioByteBuffer()) ret.flip() result.success(new NioManagedBuffer(ret)) } }) Await.result(result.future, Duration.Inf) } it seems that fetchBlockSync method does not have a timeout limit but wait forever ? Anybody can show me how to control the timeout here?
Re: broadcast hang out
Anyone can help? Thanks a lot ! 2015-03-16 11:45 GMT+08:00 lonely Feb lonely8...@gmail.com: yes 2015-03-16 11:43 GMT+08:00 Mridul Muralidharan mri...@gmail.com: Cross region as in different data centers ? - Mridul On Sun, Mar 15, 2015 at 8:08 PM, lonely Feb lonely8...@gmail.com wrote: Hi all, i meet up with a problem that torrent broadcast hang out in my spark cluster (1.2, standalone) , particularly serious when driver and executors are cross-region. when i read the code of broadcast i found that a sync block read here: def fetchBlockSync(host: String, port: Int, execId: String, blockId: String): ManagedBuffer = { // A monitor for the thread to wait on. val result = Promise[ManagedBuffer]() fetchBlocks(host, port, execId, Array(blockId), new BlockFetchingListener { override def onBlockFetchFailure(blockId: String, exception: Throwable): Unit = { result.failure(exception) } override def onBlockFetchSuccess(blockId: String, data: ManagedBuffer): Unit = { val ret = ByteBuffer.allocate(data.size.toInt) ret.put(data.nioByteBuffer()) ret.flip() result.success(new NioManagedBuffer(ret)) } }) Await.result(result.future, Duration.Inf) } it seems that fetchBlockSync method does not have a timeout limit but wait forever ? Anybody can show me how to control the timeout here?