Re: broadcast hang out

2015-03-15 Thread Mridul Muralidharan
Cross region as in different data centers ?

- Mridul

On Sun, Mar 15, 2015 at 8:08 PM, lonely Feb lonely8...@gmail.com wrote:
 Hi all, i meet up with a problem that torrent broadcast hang out in my
 spark cluster (1.2, standalone) , particularly serious when driver and
 executors are cross-region. when i read the code of broadcast i found that
 a sync block read here:

   def fetchBlockSync(host: String, port: Int, execId: String, blockId:
 String): ManagedBuffer = {
 // A monitor for the thread to wait on.
 val result = Promise[ManagedBuffer]()
 fetchBlocks(host, port, execId, Array(blockId),
   new BlockFetchingListener {
 override def onBlockFetchFailure(blockId: String, exception:
 Throwable): Unit = {
   result.failure(exception)
 }
 override def onBlockFetchSuccess(blockId: String, data:
 ManagedBuffer): Unit = {
   val ret = ByteBuffer.allocate(data.size.toInt)
   ret.put(data.nioByteBuffer())
   ret.flip()
   result.success(new NioManagedBuffer(ret))
 }
   })

 Await.result(result.future, Duration.Inf)
   }

 it seems that fetchBlockSync method does not have a timeout limit but wait
 forever ? Anybody can show me how to control the timeout here?

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: broadcast hang out

2015-03-15 Thread lonely Feb
yes

2015-03-16 11:43 GMT+08:00 Mridul Muralidharan mri...@gmail.com:

 Cross region as in different data centers ?

 - Mridul

 On Sun, Mar 15, 2015 at 8:08 PM, lonely Feb lonely8...@gmail.com wrote:
  Hi all, i meet up with a problem that torrent broadcast hang out in my
  spark cluster (1.2, standalone) , particularly serious when driver and
  executors are cross-region. when i read the code of broadcast i found
 that
  a sync block read here:
 
def fetchBlockSync(host: String, port: Int, execId: String, blockId:
  String): ManagedBuffer = {
  // A monitor for the thread to wait on.
  val result = Promise[ManagedBuffer]()
  fetchBlocks(host, port, execId, Array(blockId),
new BlockFetchingListener {
  override def onBlockFetchFailure(blockId: String, exception:
  Throwable): Unit = {
result.failure(exception)
  }
  override def onBlockFetchSuccess(blockId: String, data:
  ManagedBuffer): Unit = {
val ret = ByteBuffer.allocate(data.size.toInt)
ret.put(data.nioByteBuffer())
ret.flip()
result.success(new NioManagedBuffer(ret))
  }
})
 
  Await.result(result.future, Duration.Inf)
}
 
  it seems that fetchBlockSync method does not have a timeout limit but
 wait
  forever ? Anybody can show me how to control the timeout here?



Re: broadcast hang out

2015-03-15 Thread lonely Feb
Thx. But this method is in BlockTransferService.scala of spark which i can
not replace unless i rewrite the core code. I wonder if it is handled
somewhere already.

2015-03-16 11:27 GMT+08:00 Chester Chen ches...@alpinenow.com:

 can you just replace Duration.Inf with a shorter duration  ? how about

   import scala.concurrent.duration._
   val timeout = new Timeout(10 seconds)
   Await.result(result.future, timeout.duration)

   or

   val timeout = new FiniteDuration(10, TimeUnit.SECONDS)
   Await.result(result.future, timeout)

   or simply
   import scala.concurrent.duration._
   Await.result(result.future, 10 seconds)



 On Sun, Mar 15, 2015 at 8:08 PM, lonely Feb lonely8...@gmail.com wrote:

 Hi all, i meet up with a problem that torrent broadcast hang out in my
 spark cluster (1.2, standalone) , particularly serious when driver and
 executors are cross-region. when i read the code of broadcast i found that
 a sync block read here:

   def fetchBlockSync(host: String, port: Int, execId: String, blockId:
 String): ManagedBuffer = {
 // A monitor for the thread to wait on.
 val result = Promise[ManagedBuffer]()
 fetchBlocks(host, port, execId, Array(blockId),
   new BlockFetchingListener {
 override def onBlockFetchFailure(blockId: String, exception:
 Throwable): Unit = {
   result.failure(exception)
 }
 override def onBlockFetchSuccess(blockId: String, data:
 ManagedBuffer): Unit = {
   val ret = ByteBuffer.allocate(data.size.toInt)
   ret.put(data.nioByteBuffer())
   ret.flip()
   result.success(new NioManagedBuffer(ret))
 }
   })

 Await.result(result.future, Duration.Inf)
   }

 it seems that fetchBlockSync method does not have a timeout limit but wait
 forever ? Anybody can show me how to control the timeout here?





broadcast hang out

2015-03-15 Thread lonely Feb
Hi all, i meet up with a problem that torrent broadcast hang out in my
spark cluster (1.2, standalone) , particularly serious when driver and
executors are cross-region. when i read the code of broadcast i found that
a sync block read here:

  def fetchBlockSync(host: String, port: Int, execId: String, blockId:
String): ManagedBuffer = {
// A monitor for the thread to wait on.
val result = Promise[ManagedBuffer]()
fetchBlocks(host, port, execId, Array(blockId),
  new BlockFetchingListener {
override def onBlockFetchFailure(blockId: String, exception:
Throwable): Unit = {
  result.failure(exception)
}
override def onBlockFetchSuccess(blockId: String, data:
ManagedBuffer): Unit = {
  val ret = ByteBuffer.allocate(data.size.toInt)
  ret.put(data.nioByteBuffer())
  ret.flip()
  result.success(new NioManagedBuffer(ret))
}
  })

Await.result(result.future, Duration.Inf)
  }

it seems that fetchBlockSync method does not have a timeout limit but wait
forever ? Anybody can show me how to control the timeout here?


Re: broadcast hang out

2015-03-15 Thread Chester Chen
can you just replace Duration.Inf with a shorter duration  ? how about

  import scala.concurrent.duration._
  val timeout = new Timeout(10 seconds)
  Await.result(result.future, timeout.duration)

  or

  val timeout = new FiniteDuration(10, TimeUnit.SECONDS)
  Await.result(result.future, timeout)

  or simply
  import scala.concurrent.duration._
  Await.result(result.future, 10 seconds)



On Sun, Mar 15, 2015 at 8:08 PM, lonely Feb lonely8...@gmail.com wrote:

 Hi all, i meet up with a problem that torrent broadcast hang out in my
 spark cluster (1.2, standalone) , particularly serious when driver and
 executors are cross-region. when i read the code of broadcast i found that
 a sync block read here:

   def fetchBlockSync(host: String, port: Int, execId: String, blockId:
 String): ManagedBuffer = {
 // A monitor for the thread to wait on.
 val result = Promise[ManagedBuffer]()
 fetchBlocks(host, port, execId, Array(blockId),
   new BlockFetchingListener {
 override def onBlockFetchFailure(blockId: String, exception:
 Throwable): Unit = {
   result.failure(exception)
 }
 override def onBlockFetchSuccess(blockId: String, data:
 ManagedBuffer): Unit = {
   val ret = ByteBuffer.allocate(data.size.toInt)
   ret.put(data.nioByteBuffer())
   ret.flip()
   result.success(new NioManagedBuffer(ret))
 }
   })

 Await.result(result.future, Duration.Inf)
   }

 it seems that fetchBlockSync method does not have a timeout limit but wait
 forever ? Anybody can show me how to control the timeout here?



Re: broadcast hang out

2015-03-15 Thread lonely Feb
Anyone can help? Thanks a lot !

2015-03-16 11:45 GMT+08:00 lonely Feb lonely8...@gmail.com:

 yes

 2015-03-16 11:43 GMT+08:00 Mridul Muralidharan mri...@gmail.com:

 Cross region as in different data centers ?

 - Mridul

 On Sun, Mar 15, 2015 at 8:08 PM, lonely Feb lonely8...@gmail.com wrote:
  Hi all, i meet up with a problem that torrent broadcast hang out in my
  spark cluster (1.2, standalone) , particularly serious when driver and
  executors are cross-region. when i read the code of broadcast i found
 that
  a sync block read here:
 
def fetchBlockSync(host: String, port: Int, execId: String, blockId:
  String): ManagedBuffer = {
  // A monitor for the thread to wait on.
  val result = Promise[ManagedBuffer]()
  fetchBlocks(host, port, execId, Array(blockId),
new BlockFetchingListener {
  override def onBlockFetchFailure(blockId: String, exception:
  Throwable): Unit = {
result.failure(exception)
  }
  override def onBlockFetchSuccess(blockId: String, data:
  ManagedBuffer): Unit = {
val ret = ByteBuffer.allocate(data.size.toInt)
ret.put(data.nioByteBuffer())
ret.flip()
result.success(new NioManagedBuffer(ret))
  }
})
 
  Await.result(result.future, Duration.Inf)
}
 
  it seems that fetchBlockSync method does not have a timeout limit but
 wait
  forever ? Anybody can show me how to control the timeout here?