
You may like to take a look at how we make Spray and Spark working together
at the PredictionIO project:


On Sun, Jan 4, 2015 at 8:31 PM, Chester At Work <>

> Just a guess here, may not be correct.
>   Spray needs to start akka actor system; spark context also creates an
> akka actor system, is it possible there are some conflict ?
> On Jan 4, 2015, at 7:42 PM, Boromir Widas <> wrote:
> Hello,
> I am trying to launch a Spark app(client mode for standalone cluster) from
> a Spray server, using the following code.
> When I run it as
> $> java -cp <class paths> SprayServer
> the SimpleApp.getA() call from  SprayService returns -1(which means it
> sees the logData RDD as null for HTTP requests), but the statements from
> within get correct values from SimpleApp.getA().
> Any idea why the HTTP requests do not see the cached RDD? I have been
> trying to debug this for some time but not getting anywhere - any pointers
> will be greatly appreciated.
> Thanks.
> //////////////////////// BEGIN SPRAY SERVER
> import{ActorSystem, Props}
> import
> import spray.can.Http
> import
> import spray.routing.HttpService
> import scala.concurrent.ops
> object SprayServer {
>   def main(args: Array[String]) {
>     // we need an ActorSystem to host our service
>     implicit val system = ActorSystem()
>     //create our service actor
>     val service = system.actorOf(Props[SprayServiceActor], "test-service")
>     //bind our actor to an HTTP port
>     IO(Http) ! Http.Bind(service, interface = "", port = 8085)
>     ops.spawn {
>       * *
>     }
>   }
> }
> class SprayServiceActor extends SprayService with Actor {
>   // the HttpService trait (which SprayService will extend) defines
>   // only one abstract member, which connects the services environment
>   // to the enclosing actor or test.
>   def actorRefFactory = context
>   def receive = runRoute(rootRoute)
> }
> trait SprayService extends HttpService {
>   def default = path("") {
>     println("handling default route")
>     val numAs = *SimpleApp.getA()   // DOES NOT WORK   *
>     get { complete(s"num A: $numAs") }
>   }
>   def pingRoute = path("ping") {
>     get { complete("pong!") }
>   }
>   def pongRoute = path("pong") {
>     get { complete("pong!?") }
>   }
>   def rootRoute = pingRoute ~ pongRoute ~ default
> }
> import org.apache.spark.SparkContext
> import org.apache.spark.SparkContext._
> import org.apache.spark.SparkConf
> import org.apache.spark.deploy.SparkSubmit
> import org.apache.spark.rdd.RDD
> object SimpleApp {
>   var resultString: String = "Data not assigned"
>   var logData: RDD[String] = null
>   def main(args: Array[String]) {
>     val logFile = "/home/ovik/src/spark/" // Should be some file
> on your system
>     val conf = new SparkConf().setAppName("Simple Application")
>     val sc = new SparkContext(conf)
>     logData = sc.textFile(logFile, 2).cache()
>     val numAs = logData.filter(line => line.contains("a")).count()
>     val numBs = logData.filter(line => line.contains("b")).count()
>     resultString = "Lines with a: %s, Lines with b: %s".format(numAs,
> numBs)
>     println(resultString)
>   }
>   def getA(): Int = {
>     println(resultString)
>     if(null == logData) {
>       println("**** logData is null!")
>       -1
>     } else {
>       val numAs = logData.filter(line => line.contains("a")).count().toInt
>       println(s"**** numAs: $numAs")
>       numAs
>     }
>   }
> }
> object SimpleAppLoader {
>   def main(args: Array[String]) {
>     run()
>   }
>   def run() {
>     val clArgs = Array(
>       "--deploy-mode", "client"
>       , "--total-executor-cores", "2"
>       , "--class", "SimpleApp"
>       , "--conf", "spark.shuffle.spill=false"
>       , "--conf", "spark.master=spark://troika:7077"
>       , "--conf", "spark.driver.memory=128m"
>       , "--conf", "spark.executor.memory=128m"
>       , "--conf", "spark.eventLog.enabled=true"
>       , "--conf", "spark.eventLog.dir=/home/ovik/logs"
>       , SparkContext.jarOfClass(this.getClass).get)
>     SparkSubmit.main(clArgs)
>     val numAs = *SimpleApp.getA()    // WORKS     *
>     println(s"numAs is $numAs")
>   }
> }

