Boromir, You may like to take a look at how we make Spray and Spark working together at the PredictionIO project: https://github.com/PredictionIO/PredictionIO
Simon On Sun, Jan 4, 2015 at 8:31 PM, Chester At Work <ches...@alpinenow.com> wrote: > Just a guess here, may not be correct. > > Spray needs to start akka actor system; spark context also creates an > akka actor system, is it possible there are some conflict ? > > > > Sent from my iPad > > On Jan 4, 2015, at 7:42 PM, Boromir Widas <vcsub...@gmail.com> wrote: > > Hello, > > I am trying to launch a Spark app(client mode for standalone cluster) from > a Spray server, using the following code. > > When I run it as > > $> java -cp <class paths> SprayServer > > the SimpleApp.getA() call from SprayService returns -1(which means it > sees the logData RDD as null for HTTP requests), but the statements from > within SimpleAppLoader.run() get correct values from SimpleApp.getA(). > > Any idea why the HTTP requests do not see the cached RDD? I have been > trying to debug this for some time but not getting anywhere - any pointers > will be greatly appreciated. > > Thanks. > > //////////////////////// BEGIN SPRAY SERVER > > import akka.actor.{ActorSystem, Props} > > > > import akka.io.IO > > > > import spray.can.Http > > > > > > > > import akka.actor._ > > > > import spray.routing.HttpService > > > > import scala.concurrent.ops > > > > > > > > object SprayServer { > > > > def main(args: Array[String]) { > > > > // we need an ActorSystem to host our service > > > > implicit val system = ActorSystem() > > > > > > > > //create our service actor > > > > val service = system.actorOf(Props[SprayServiceActor], "test-service") > > > > > > > > //bind our actor to an HTTP port > > > > IO(Http) ! Http.Bind(service, interface = "0.0.0.0", port = 8085) > > > > > > > > ops.spawn { > > > > *SimpleAppLoader.run() * > > > > } > > > > } > > > > } > > > > > > > > class SprayServiceActor extends SprayService with Actor { > > > > // the HttpService trait (which SprayService will extend) defines > > > > // only one abstract member, which connects the services environment > > > > // to the enclosing actor or test. > > > > def actorRefFactory = context > > > > > > > > def receive = runRoute(rootRoute) > > > > } > > > > > > > > trait SprayService extends HttpService { > > > > > > > > def default = path("") { > > > > println("handling default route") > > > > val numAs = *SimpleApp.getA() // DOES NOT WORK * > > > > > get { complete(s"num A: $numAs") } > > > > } > > > > > > > > def pingRoute = path("ping") { > > > > get { complete("pong!") } > > > > } > > > > > > > > def pongRoute = path("pong") { > > > > get { complete("pong!?") } > > > > } > > > > > > > > def rootRoute = pingRoute ~ pongRoute ~ default > > > > } > > > > > > > > ////////// END SPRAY, BEGIN SPARK > > > > > > > > import org.apache.spark.SparkContext > > > > import org.apache.spark.SparkContext._ > > > > import org.apache.spark.SparkConf > > > > import org.apache.spark.deploy.SparkSubmit > > > > import org.apache.spark.rdd.RDD > > > > > > > > object SimpleApp { > > > > var resultString: String = "Data not assigned" > > > > var logData: RDD[String] = null > > > > def main(args: Array[String]) { > > > > val logFile = "/home/ovik/src/spark/README.md" // Should be some file > on your system > > > val conf = new SparkConf().setAppName("Simple Application") > > > > val sc = new SparkContext(conf) > > > > logData = sc.textFile(logFile, 2).cache() > > > > val numAs = logData.filter(line => line.contains("a")).count() > > > > val numBs = logData.filter(line => line.contains("b")).count() > > > > resultString = "Lines with a: %s, Lines with b: %s".format(numAs, > numBs) > > > println(resultString) > > > > } > > > > def getA(): Int = { > > > > println(resultString) > > > > if(null == logData) { > > > > println("**** logData is null!") > > > > -1 > > > > } else { > > > > val numAs = logData.filter(line => line.contains("a")).count().toInt > > > > println(s"**** numAs: $numAs") > > > > numAs > > > > } > > > > } > > > > } > > > > > > > > object SimpleAppLoader { > > > > def main(args: Array[String]) { > > > > run() > > > > } > > > > > > > > def run() { > > > > > > > > val clArgs = Array( > > > > "--deploy-mode", "client" > > > > , "--total-executor-cores", "2" > > > > , "--class", "SimpleApp" > > > > , "--conf", "spark.shuffle.spill=false" > > > > , "--conf", "spark.master=spark://troika:7077" > > > > , "--conf", "spark.driver.memory=128m" > > > > , "--conf", "spark.executor.memory=128m" > > > > , "--conf", "spark.eventLog.enabled=true" > > > > , "--conf", "spark.eventLog.dir=/home/ovik/logs" > > > > , SparkContext.jarOfClass(this.getClass).get) > > > > > > > > SparkSubmit.main(clArgs) > > > > > > > > val numAs = *SimpleApp.getA() // WORKS * > > > > > > > > > println(s"numAs is $numAs") > > > > } > > > > } > > > > > > >