spark core api vs. google cloud dataflow

2016-02-23 Thread lonely Feb
oogle Cloud Dataflow provides distributed dataset which called PCollection,
and syntactic sugar based on PCollection is provided in the form of
"apply". Note that "apply" is different from spark api "map" which passing
each element of the source through a function func. I wonder can spark
support this kind of syntactic sugar, if not, why?


Spark SQL cannot tolerate regexp with BIGINT

2015-04-29 Thread lonely Feb
Hi all, we are transfer our HIVE job into SparkSQL, but we found a litter
difference between HIVE and Spark SQL that our sql has a statement like:

select A from B where id regexp '^12345$'

in HIVE it works fine but in Spark SQL we got a:

java.lang.ClassCastException: java.lang.Long cannot be cast to
java.lang.String

Can this statement be handled with Spark SQL?


Re: broadcast hang out

2015-03-15 Thread lonely Feb
yes

2015-03-16 11:43 GMT+08:00 Mridul Muralidharan mri...@gmail.com:

 Cross region as in different data centers ?

 - Mridul

 On Sun, Mar 15, 2015 at 8:08 PM, lonely Feb lonely8...@gmail.com wrote:
  Hi all, i meet up with a problem that torrent broadcast hang out in my
  spark cluster (1.2, standalone) , particularly serious when driver and
  executors are cross-region. when i read the code of broadcast i found
 that
  a sync block read here:
 
def fetchBlockSync(host: String, port: Int, execId: String, blockId:
  String): ManagedBuffer = {
  // A monitor for the thread to wait on.
  val result = Promise[ManagedBuffer]()
  fetchBlocks(host, port, execId, Array(blockId),
new BlockFetchingListener {
  override def onBlockFetchFailure(blockId: String, exception:
  Throwable): Unit = {
result.failure(exception)
  }
  override def onBlockFetchSuccess(blockId: String, data:
  ManagedBuffer): Unit = {
val ret = ByteBuffer.allocate(data.size.toInt)
ret.put(data.nioByteBuffer())
ret.flip()
result.success(new NioManagedBuffer(ret))
  }
})
 
  Await.result(result.future, Duration.Inf)
}
 
  it seems that fetchBlockSync method does not have a timeout limit but
 wait
  forever ? Anybody can show me how to control the timeout here?



Re: broadcast hang out

2015-03-15 Thread lonely Feb
Thx. But this method is in BlockTransferService.scala of spark which i can
not replace unless i rewrite the core code. I wonder if it is handled
somewhere already.

2015-03-16 11:27 GMT+08:00 Chester Chen ches...@alpinenow.com:

 can you just replace Duration.Inf with a shorter duration  ? how about

   import scala.concurrent.duration._
   val timeout = new Timeout(10 seconds)
   Await.result(result.future, timeout.duration)

   or

   val timeout = new FiniteDuration(10, TimeUnit.SECONDS)
   Await.result(result.future, timeout)

   or simply
   import scala.concurrent.duration._
   Await.result(result.future, 10 seconds)



 On Sun, Mar 15, 2015 at 8:08 PM, lonely Feb lonely8...@gmail.com wrote:

 Hi all, i meet up with a problem that torrent broadcast hang out in my
 spark cluster (1.2, standalone) , particularly serious when driver and
 executors are cross-region. when i read the code of broadcast i found that
 a sync block read here:

   def fetchBlockSync(host: String, port: Int, execId: String, blockId:
 String): ManagedBuffer = {
 // A monitor for the thread to wait on.
 val result = Promise[ManagedBuffer]()
 fetchBlocks(host, port, execId, Array(blockId),
   new BlockFetchingListener {
 override def onBlockFetchFailure(blockId: String, exception:
 Throwable): Unit = {
   result.failure(exception)
 }
 override def onBlockFetchSuccess(blockId: String, data:
 ManagedBuffer): Unit = {
   val ret = ByteBuffer.allocate(data.size.toInt)
   ret.put(data.nioByteBuffer())
   ret.flip()
   result.success(new NioManagedBuffer(ret))
 }
   })

 Await.result(result.future, Duration.Inf)
   }

 it seems that fetchBlockSync method does not have a timeout limit but wait
 forever ? Anybody can show me how to control the timeout here?





broadcast hang out

2015-03-15 Thread lonely Feb
Hi all, i meet up with a problem that torrent broadcast hang out in my
spark cluster (1.2, standalone) , particularly serious when driver and
executors are cross-region. when i read the code of broadcast i found that
a sync block read here:

  def fetchBlockSync(host: String, port: Int, execId: String, blockId:
String): ManagedBuffer = {
// A monitor for the thread to wait on.
val result = Promise[ManagedBuffer]()
fetchBlocks(host, port, execId, Array(blockId),
  new BlockFetchingListener {
override def onBlockFetchFailure(blockId: String, exception:
Throwable): Unit = {
  result.failure(exception)
}
override def onBlockFetchSuccess(blockId: String, data:
ManagedBuffer): Unit = {
  val ret = ByteBuffer.allocate(data.size.toInt)
  ret.put(data.nioByteBuffer())
  ret.flip()
  result.success(new NioManagedBuffer(ret))
}
  })

Await.result(result.future, Duration.Inf)
  }

it seems that fetchBlockSync method does not have a timeout limit but wait
forever ? Anybody can show me how to control the timeout here?


Re: broadcast hang out

2015-03-15 Thread lonely Feb
Anyone can help? Thanks a lot !

2015-03-16 11:45 GMT+08:00 lonely Feb lonely8...@gmail.com:

 yes

 2015-03-16 11:43 GMT+08:00 Mridul Muralidharan mri...@gmail.com:

 Cross region as in different data centers ?

 - Mridul

 On Sun, Mar 15, 2015 at 8:08 PM, lonely Feb lonely8...@gmail.com wrote:
  Hi all, i meet up with a problem that torrent broadcast hang out in my
  spark cluster (1.2, standalone) , particularly serious when driver and
  executors are cross-region. when i read the code of broadcast i found
 that
  a sync block read here:
 
def fetchBlockSync(host: String, port: Int, execId: String, blockId:
  String): ManagedBuffer = {
  // A monitor for the thread to wait on.
  val result = Promise[ManagedBuffer]()
  fetchBlocks(host, port, execId, Array(blockId),
new BlockFetchingListener {
  override def onBlockFetchFailure(blockId: String, exception:
  Throwable): Unit = {
result.failure(exception)
  }
  override def onBlockFetchSuccess(blockId: String, data:
  ManagedBuffer): Unit = {
val ret = ByteBuffer.allocate(data.size.toInt)
ret.put(data.nioByteBuffer())
ret.flip()
result.success(new NioManagedBuffer(ret))
  }
})
 
  Await.result(result.future, Duration.Inf)
}
 
  it seems that fetchBlockSync method does not have a timeout limit but
 wait
  forever ? Anybody can show me how to control the timeout here?