Re: Launching Spark app in client mode for standalone cluster

2015-01-06 Thread Boromir Widas
Thanks for the pointers. The issue was due to route caching by Spray, which
would always return the same value. Other than that the program is working
fine.

On Mon, Jan 5, 2015 at 12:44 AM, Simon Chan simonc...@gmail.com wrote:

 Boromir,

 You may like to take a look at how we make Spray and Spark working
 together at the PredictionIO project:
 https://github.com/PredictionIO/PredictionIO



 Simon

 On Sun, Jan 4, 2015 at 8:31 PM, Chester At Work ches...@alpinenow.com
 wrote:

 Just a guess here, may not be correct.

   Spray needs to start akka actor system; spark context also creates an
 akka actor system, is it possible there are some conflict ?



 Sent from my iPad

 On Jan 4, 2015, at 7:42 PM, Boromir Widas vcsub...@gmail.com wrote:

 Hello,

 I am trying to launch a Spark app(client mode for standalone cluster)
 from a Spray server, using the following code.

 When I run it as

 $ java -cp class paths SprayServer

 the SimpleApp.getA() call from  SprayService returns -1(which means it
 sees the logData RDD as null for HTTP requests), but the statements from
 within SimpleAppLoader.run() get correct values from SimpleApp.getA().

 Any idea why the HTTP requests do not see the cached RDD? I have been
 trying to debug this for some time but not getting anywhere - any pointers
 will be greatly appreciated.

 Thanks.

  BEGIN SPRAY SERVER

 import akka.actor.{ActorSystem, Props}



 import akka.io.IO



 import spray.can.Http







 import akka.actor._



 import spray.routing.HttpService



 import scala.concurrent.ops







 object SprayServer {



   def main(args: Array[String]) {



 // we need an ActorSystem to host our service



 implicit val system = ActorSystem()







 //create our service actor



 val service = system.actorOf(Props[SprayServiceActor],
 test-service)







 //bind our actor to an HTTP port



 IO(Http) ! Http.Bind(service, interface = 0.0.0.0, port = 8085)







 ops.spawn {



   *SimpleAppLoader.run() *



 }



   }



 }







 class SprayServiceActor extends SprayService with Actor {



   // the HttpService trait (which SprayService will extend) defines



   // only one abstract member, which connects the services environment



   // to the enclosing actor or test.



   def actorRefFactory = context







   def receive = runRoute(rootRoute)



 }







 trait SprayService extends HttpService {







   def default = path() {



 println(handling default route)



 val numAs = *SimpleApp.getA()   // DOES NOT WORK   *




 get { complete(snum A: $numAs) }



   }







   def pingRoute = path(ping) {



 get { complete(pong!) }



   }







   def pongRoute = path(pong) {



 get { complete(pong!?) }



   }







   def rootRoute = pingRoute ~ pongRoute ~ default



 }







 // END SPRAY, BEGIN SPARK







 import org.apache.spark.SparkContext



 import org.apache.spark.SparkContext._



 import org.apache.spark.SparkConf



 import org.apache.spark.deploy.SparkSubmit



 import org.apache.spark.rdd.RDD







 object SimpleApp {



   var resultString: String = Data not assigned



   var logData: RDD[String] = null



   def main(args: Array[String]) {



 val logFile = /home/ovik/src/spark/README.md // Should be some file
 on your system


 val conf = new SparkConf().setAppName(Simple Application)



 val sc = new SparkContext(conf)



 logData = sc.textFile(logFile, 2).cache()



 val numAs = logData.filter(line = line.contains(a)).count()



 val numBs = logData.filter(line = line.contains(b)).count()



 resultString = Lines with a: %s, Lines with b: %s.format(numAs,
 numBs)


 println(resultString)



   }



   def getA(): Int = {



 println(resultString)



 if(null == logData) {



   println( logData is null!)



   -1



 } else {



   val numAs = logData.filter(line =
 line.contains(a)).count().toInt



   println(s numAs: $numAs)



   numAs



 }



   }



 }







 object SimpleAppLoader {



   def main(args: Array[String]) {



 run()



   }







   def run() {







 val clArgs = Array(



   --deploy-mode, client



   , --total-executor-cores, 2



   , --class, SimpleApp



   , --conf, spark.shuffle.spill=false



   , --conf, spark.master=spark://troika:7077



   , --conf, spark.driver.memory=128m



   , --conf, spark.executor.memory=128m



   , --conf, spark.eventLog.enabled=true



   , --conf, spark.eventLog.dir=/home/ovik/logs



   , SparkContext.jarOfClass(this.getClass).get)







 SparkSubmit.main(clArgs)







 val numAs = *SimpleApp.getA()// WORKS *








 println(snumAs is $numAs)



   }



 }










Launching Spark app in client mode for standalone cluster

2015-01-04 Thread Boromir Widas
Hello,

I am trying to launch a Spark app(client mode for standalone cluster) from
a Spray server, using the following code.

When I run it as

$ java -cp class paths SprayServer

the SimpleApp.getA() call from  SprayService returns -1(which means it sees
the logData RDD as null for HTTP requests), but the statements from within
SimpleAppLoader.run() get correct values from SimpleApp.getA().

Any idea why the HTTP requests do not see the cached RDD? I have been
trying to debug this for some time but not getting anywhere - any pointers
will be greatly appreciated.

Thanks.

 BEGIN SPRAY SERVER

import akka.actor.{ActorSystem, Props}



import akka.io.IO



import spray.can.Http







import akka.actor._



import spray.routing.HttpService



import scala.concurrent.ops







object SprayServer {



  def main(args: Array[String]) {



// we need an ActorSystem to host our service



implicit val system = ActorSystem()







//create our service actor



val service = system.actorOf(Props[SprayServiceActor], test-service)







//bind our actor to an HTTP port



IO(Http) ! Http.Bind(service, interface = 0.0.0.0, port = 8085)







ops.spawn {



  *SimpleAppLoader.run() *



}



  }



}







class SprayServiceActor extends SprayService with Actor {



  // the HttpService trait (which SprayService will extend) defines



  // only one abstract member, which connects the services environment



  // to the enclosing actor or test.



  def actorRefFactory = context







  def receive = runRoute(rootRoute)



}







trait SprayService extends HttpService {







  def default = path() {



println(handling default route)



val numAs = *SimpleApp.getA()   // DOES NOT WORK   *




get { complete(snum A: $numAs) }



  }







  def pingRoute = path(ping) {



get { complete(pong!) }



  }







  def pongRoute = path(pong) {



get { complete(pong!?) }



  }







  def rootRoute = pingRoute ~ pongRoute ~ default



}







// END SPRAY, BEGIN SPARK







import org.apache.spark.SparkContext



import org.apache.spark.SparkContext._



import org.apache.spark.SparkConf



import org.apache.spark.deploy.SparkSubmit



import org.apache.spark.rdd.RDD







object SimpleApp {



  var resultString: String = Data not assigned



  var logData: RDD[String] = null



  def main(args: Array[String]) {



val logFile = /home/ovik/src/spark/README.md // Should be some file
on your system


val conf = new SparkConf().setAppName(Simple Application)



val sc = new SparkContext(conf)



logData = sc.textFile(logFile, 2).cache()



val numAs = logData.filter(line = line.contains(a)).count()



val numBs = logData.filter(line = line.contains(b)).count()



resultString = Lines with a: %s, Lines with b: %s.format(numAs,
numBs)


println(resultString)



  }



  def getA(): Int = {



println(resultString)



if(null == logData) {



  println( logData is null!)



  -1



} else {



  val numAs = logData.filter(line = line.contains(a)).count().toInt



  println(s numAs: $numAs)



  numAs



}



  }



}







object SimpleAppLoader {



  def main(args: Array[String]) {



run()



  }







  def run() {







val clArgs = Array(



  --deploy-mode, client



  , --total-executor-cores, 2



  , --class, SimpleApp



  , --conf, spark.shuffle.spill=false



  , --conf, spark.master=spark://troika:7077



  , --conf, spark.driver.memory=128m



  , --conf, spark.executor.memory=128m



  , --conf, spark.eventLog.enabled=true



  , --conf, spark.eventLog.dir=/home/ovik/logs



  , SparkContext.jarOfClass(this.getClass).get)







SparkSubmit.main(clArgs)







val numAs = *SimpleApp.getA()// WORKS *







println(snumAs is $numAs)



  }



}


Re: Launching Spark app in client mode for standalone cluster

2015-01-04 Thread Simon Chan
Boromir,

You may like to take a look at how we make Spray and Spark working together
at the PredictionIO project: https://github.com/PredictionIO/PredictionIO



Simon

On Sun, Jan 4, 2015 at 8:31 PM, Chester At Work ches...@alpinenow.com
wrote:

 Just a guess here, may not be correct.

   Spray needs to start akka actor system; spark context also creates an
 akka actor system, is it possible there are some conflict ?



 Sent from my iPad

 On Jan 4, 2015, at 7:42 PM, Boromir Widas vcsub...@gmail.com wrote:

 Hello,

 I am trying to launch a Spark app(client mode for standalone cluster) from
 a Spray server, using the following code.

 When I run it as

 $ java -cp class paths SprayServer

 the SimpleApp.getA() call from  SprayService returns -1(which means it
 sees the logData RDD as null for HTTP requests), but the statements from
 within SimpleAppLoader.run() get correct values from SimpleApp.getA().

 Any idea why the HTTP requests do not see the cached RDD? I have been
 trying to debug this for some time but not getting anywhere - any pointers
 will be greatly appreciated.

 Thanks.

  BEGIN SPRAY SERVER

 import akka.actor.{ActorSystem, Props}



 import akka.io.IO



 import spray.can.Http







 import akka.actor._



 import spray.routing.HttpService



 import scala.concurrent.ops







 object SprayServer {



   def main(args: Array[String]) {



 // we need an ActorSystem to host our service



 implicit val system = ActorSystem()







 //create our service actor



 val service = system.actorOf(Props[SprayServiceActor], test-service)







 //bind our actor to an HTTP port



 IO(Http) ! Http.Bind(service, interface = 0.0.0.0, port = 8085)







 ops.spawn {



   *SimpleAppLoader.run() *



 }



   }



 }







 class SprayServiceActor extends SprayService with Actor {



   // the HttpService trait (which SprayService will extend) defines



   // only one abstract member, which connects the services environment



   // to the enclosing actor or test.



   def actorRefFactory = context







   def receive = runRoute(rootRoute)



 }







 trait SprayService extends HttpService {







   def default = path() {



 println(handling default route)



 val numAs = *SimpleApp.getA()   // DOES NOT WORK   *




 get { complete(snum A: $numAs) }



   }







   def pingRoute = path(ping) {



 get { complete(pong!) }



   }







   def pongRoute = path(pong) {



 get { complete(pong!?) }



   }







   def rootRoute = pingRoute ~ pongRoute ~ default



 }







 // END SPRAY, BEGIN SPARK







 import org.apache.spark.SparkContext



 import org.apache.spark.SparkContext._



 import org.apache.spark.SparkConf



 import org.apache.spark.deploy.SparkSubmit



 import org.apache.spark.rdd.RDD







 object SimpleApp {



   var resultString: String = Data not assigned



   var logData: RDD[String] = null



   def main(args: Array[String]) {



 val logFile = /home/ovik/src/spark/README.md // Should be some file
 on your system


 val conf = new SparkConf().setAppName(Simple Application)



 val sc = new SparkContext(conf)



 logData = sc.textFile(logFile, 2).cache()



 val numAs = logData.filter(line = line.contains(a)).count()



 val numBs = logData.filter(line = line.contains(b)).count()



 resultString = Lines with a: %s, Lines with b: %s.format(numAs,
 numBs)


 println(resultString)



   }



   def getA(): Int = {



 println(resultString)



 if(null == logData) {



   println( logData is null!)



   -1



 } else {



   val numAs = logData.filter(line = line.contains(a)).count().toInt



   println(s numAs: $numAs)



   numAs



 }



   }



 }







 object SimpleAppLoader {



   def main(args: Array[String]) {



 run()



   }







   def run() {







 val clArgs = Array(



   --deploy-mode, client



   , --total-executor-cores, 2



   , --class, SimpleApp



   , --conf, spark.shuffle.spill=false



   , --conf, spark.master=spark://troika:7077



   , --conf, spark.driver.memory=128m



   , --conf, spark.executor.memory=128m



   , --conf, spark.eventLog.enabled=true



   , --conf, spark.eventLog.dir=/home/ovik/logs



   , SparkContext.jarOfClass(this.getClass).get)







 SparkSubmit.main(clArgs)







 val numAs = *SimpleApp.getA()// WORKS *








 println(snumAs is $numAs)



   }



 }