Re: Launching Spark app in client mode for standalone cluster
Thanks for the pointers. The issue was due to route caching by Spray, which would always return the same value. Other than that the program is working fine. On Mon, Jan 5, 2015 at 12:44 AM, Simon Chan simonc...@gmail.com wrote: Boromir, You may like to take a look at how we make Spray and Spark working together at the PredictionIO project: https://github.com/PredictionIO/PredictionIO Simon On Sun, Jan 4, 2015 at 8:31 PM, Chester At Work ches...@alpinenow.com wrote: Just a guess here, may not be correct. Spray needs to start akka actor system; spark context also creates an akka actor system, is it possible there are some conflict ? Sent from my iPad On Jan 4, 2015, at 7:42 PM, Boromir Widas vcsub...@gmail.com wrote: Hello, I am trying to launch a Spark app(client mode for standalone cluster) from a Spray server, using the following code. When I run it as $ java -cp class paths SprayServer the SimpleApp.getA() call from SprayService returns -1(which means it sees the logData RDD as null for HTTP requests), but the statements from within SimpleAppLoader.run() get correct values from SimpleApp.getA(). Any idea why the HTTP requests do not see the cached RDD? I have been trying to debug this for some time but not getting anywhere - any pointers will be greatly appreciated. Thanks. BEGIN SPRAY SERVER import akka.actor.{ActorSystem, Props} import akka.io.IO import spray.can.Http import akka.actor._ import spray.routing.HttpService import scala.concurrent.ops object SprayServer { def main(args: Array[String]) { // we need an ActorSystem to host our service implicit val system = ActorSystem() //create our service actor val service = system.actorOf(Props[SprayServiceActor], test-service) //bind our actor to an HTTP port IO(Http) ! Http.Bind(service, interface = 0.0.0.0, port = 8085) ops.spawn { *SimpleAppLoader.run() * } } } class SprayServiceActor extends SprayService with Actor { // the HttpService trait (which SprayService will extend) defines // only one abstract member, which connects the services environment // to the enclosing actor or test. def actorRefFactory = context def receive = runRoute(rootRoute) } trait SprayService extends HttpService { def default = path() { println(handling default route) val numAs = *SimpleApp.getA() // DOES NOT WORK * get { complete(snum A: $numAs) } } def pingRoute = path(ping) { get { complete(pong!) } } def pongRoute = path(pong) { get { complete(pong!?) } } def rootRoute = pingRoute ~ pongRoute ~ default } // END SPRAY, BEGIN SPARK import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf import org.apache.spark.deploy.SparkSubmit import org.apache.spark.rdd.RDD object SimpleApp { var resultString: String = Data not assigned var logData: RDD[String] = null def main(args: Array[String]) { val logFile = /home/ovik/src/spark/README.md // Should be some file on your system val conf = new SparkConf().setAppName(Simple Application) val sc = new SparkContext(conf) logData = sc.textFile(logFile, 2).cache() val numAs = logData.filter(line = line.contains(a)).count() val numBs = logData.filter(line = line.contains(b)).count() resultString = Lines with a: %s, Lines with b: %s.format(numAs, numBs) println(resultString) } def getA(): Int = { println(resultString) if(null == logData) { println( logData is null!) -1 } else { val numAs = logData.filter(line = line.contains(a)).count().toInt println(s numAs: $numAs) numAs } } } object SimpleAppLoader { def main(args: Array[String]) { run() } def run() { val clArgs = Array( --deploy-mode, client , --total-executor-cores, 2 , --class, SimpleApp , --conf, spark.shuffle.spill=false , --conf, spark.master=spark://troika:7077 , --conf, spark.driver.memory=128m , --conf, spark.executor.memory=128m , --conf, spark.eventLog.enabled=true , --conf, spark.eventLog.dir=/home/ovik/logs , SparkContext.jarOfClass(this.getClass).get) SparkSubmit.main(clArgs) val numAs = *SimpleApp.getA()// WORKS * println(snumAs is $numAs) } }
Launching Spark app in client mode for standalone cluster
Hello, I am trying to launch a Spark app(client mode for standalone cluster) from a Spray server, using the following code. When I run it as $ java -cp class paths SprayServer the SimpleApp.getA() call from SprayService returns -1(which means it sees the logData RDD as null for HTTP requests), but the statements from within SimpleAppLoader.run() get correct values from SimpleApp.getA(). Any idea why the HTTP requests do not see the cached RDD? I have been trying to debug this for some time but not getting anywhere - any pointers will be greatly appreciated. Thanks. BEGIN SPRAY SERVER import akka.actor.{ActorSystem, Props} import akka.io.IO import spray.can.Http import akka.actor._ import spray.routing.HttpService import scala.concurrent.ops object SprayServer { def main(args: Array[String]) { // we need an ActorSystem to host our service implicit val system = ActorSystem() //create our service actor val service = system.actorOf(Props[SprayServiceActor], test-service) //bind our actor to an HTTP port IO(Http) ! Http.Bind(service, interface = 0.0.0.0, port = 8085) ops.spawn { *SimpleAppLoader.run() * } } } class SprayServiceActor extends SprayService with Actor { // the HttpService trait (which SprayService will extend) defines // only one abstract member, which connects the services environment // to the enclosing actor or test. def actorRefFactory = context def receive = runRoute(rootRoute) } trait SprayService extends HttpService { def default = path() { println(handling default route) val numAs = *SimpleApp.getA() // DOES NOT WORK * get { complete(snum A: $numAs) } } def pingRoute = path(ping) { get { complete(pong!) } } def pongRoute = path(pong) { get { complete(pong!?) } } def rootRoute = pingRoute ~ pongRoute ~ default } // END SPRAY, BEGIN SPARK import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf import org.apache.spark.deploy.SparkSubmit import org.apache.spark.rdd.RDD object SimpleApp { var resultString: String = Data not assigned var logData: RDD[String] = null def main(args: Array[String]) { val logFile = /home/ovik/src/spark/README.md // Should be some file on your system val conf = new SparkConf().setAppName(Simple Application) val sc = new SparkContext(conf) logData = sc.textFile(logFile, 2).cache() val numAs = logData.filter(line = line.contains(a)).count() val numBs = logData.filter(line = line.contains(b)).count() resultString = Lines with a: %s, Lines with b: %s.format(numAs, numBs) println(resultString) } def getA(): Int = { println(resultString) if(null == logData) { println( logData is null!) -1 } else { val numAs = logData.filter(line = line.contains(a)).count().toInt println(s numAs: $numAs) numAs } } } object SimpleAppLoader { def main(args: Array[String]) { run() } def run() { val clArgs = Array( --deploy-mode, client , --total-executor-cores, 2 , --class, SimpleApp , --conf, spark.shuffle.spill=false , --conf, spark.master=spark://troika:7077 , --conf, spark.driver.memory=128m , --conf, spark.executor.memory=128m , --conf, spark.eventLog.enabled=true , --conf, spark.eventLog.dir=/home/ovik/logs , SparkContext.jarOfClass(this.getClass).get) SparkSubmit.main(clArgs) val numAs = *SimpleApp.getA()// WORKS * println(snumAs is $numAs) } }
Re: Launching Spark app in client mode for standalone cluster
Boromir, You may like to take a look at how we make Spray and Spark working together at the PredictionIO project: https://github.com/PredictionIO/PredictionIO Simon On Sun, Jan 4, 2015 at 8:31 PM, Chester At Work ches...@alpinenow.com wrote: Just a guess here, may not be correct. Spray needs to start akka actor system; spark context also creates an akka actor system, is it possible there are some conflict ? Sent from my iPad On Jan 4, 2015, at 7:42 PM, Boromir Widas vcsub...@gmail.com wrote: Hello, I am trying to launch a Spark app(client mode for standalone cluster) from a Spray server, using the following code. When I run it as $ java -cp class paths SprayServer the SimpleApp.getA() call from SprayService returns -1(which means it sees the logData RDD as null for HTTP requests), but the statements from within SimpleAppLoader.run() get correct values from SimpleApp.getA(). Any idea why the HTTP requests do not see the cached RDD? I have been trying to debug this for some time but not getting anywhere - any pointers will be greatly appreciated. Thanks. BEGIN SPRAY SERVER import akka.actor.{ActorSystem, Props} import akka.io.IO import spray.can.Http import akka.actor._ import spray.routing.HttpService import scala.concurrent.ops object SprayServer { def main(args: Array[String]) { // we need an ActorSystem to host our service implicit val system = ActorSystem() //create our service actor val service = system.actorOf(Props[SprayServiceActor], test-service) //bind our actor to an HTTP port IO(Http) ! Http.Bind(service, interface = 0.0.0.0, port = 8085) ops.spawn { *SimpleAppLoader.run() * } } } class SprayServiceActor extends SprayService with Actor { // the HttpService trait (which SprayService will extend) defines // only one abstract member, which connects the services environment // to the enclosing actor or test. def actorRefFactory = context def receive = runRoute(rootRoute) } trait SprayService extends HttpService { def default = path() { println(handling default route) val numAs = *SimpleApp.getA() // DOES NOT WORK * get { complete(snum A: $numAs) } } def pingRoute = path(ping) { get { complete(pong!) } } def pongRoute = path(pong) { get { complete(pong!?) } } def rootRoute = pingRoute ~ pongRoute ~ default } // END SPRAY, BEGIN SPARK import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf import org.apache.spark.deploy.SparkSubmit import org.apache.spark.rdd.RDD object SimpleApp { var resultString: String = Data not assigned var logData: RDD[String] = null def main(args: Array[String]) { val logFile = /home/ovik/src/spark/README.md // Should be some file on your system val conf = new SparkConf().setAppName(Simple Application) val sc = new SparkContext(conf) logData = sc.textFile(logFile, 2).cache() val numAs = logData.filter(line = line.contains(a)).count() val numBs = logData.filter(line = line.contains(b)).count() resultString = Lines with a: %s, Lines with b: %s.format(numAs, numBs) println(resultString) } def getA(): Int = { println(resultString) if(null == logData) { println( logData is null!) -1 } else { val numAs = logData.filter(line = line.contains(a)).count().toInt println(s numAs: $numAs) numAs } } } object SimpleAppLoader { def main(args: Array[String]) { run() } def run() { val clArgs = Array( --deploy-mode, client , --total-executor-cores, 2 , --class, SimpleApp , --conf, spark.shuffle.spill=false , --conf, spark.master=spark://troika:7077 , --conf, spark.driver.memory=128m , --conf, spark.executor.memory=128m , --conf, spark.eventLog.enabled=true , --conf, spark.eventLog.dir=/home/ovik/logs , SparkContext.jarOfClass(this.getClass).get) SparkSubmit.main(clArgs) val numAs = *SimpleApp.getA()// WORKS * println(snumAs is $numAs) } }