Boromir,

You may like to take a look at how we make Spray and Spark working together
at the PredictionIO project: https://github.com/PredictionIO/PredictionIO



Simon

On Sun, Jan 4, 2015 at 8:31 PM, Chester At Work <ches...@alpinenow.com>
wrote:

> Just a guess here, may not be correct.
>
>   Spray needs to start akka actor system; spark context also creates an
> akka actor system, is it possible there are some conflict ?
>
>
>
> Sent from my iPad
>
> On Jan 4, 2015, at 7:42 PM, Boromir Widas <vcsub...@gmail.com> wrote:
>
> Hello,
>
> I am trying to launch a Spark app(client mode for standalone cluster) from
> a Spray server, using the following code.
>
> When I run it as
>
> $> java -cp <class paths> SprayServer
>
> the SimpleApp.getA() call from  SprayService returns -1(which means it
> sees the logData RDD as null for HTTP requests), but the statements from
> within SimpleAppLoader.run() get correct values from SimpleApp.getA().
>
> Any idea why the HTTP requests do not see the cached RDD? I have been
> trying to debug this for some time but not getting anywhere - any pointers
> will be greatly appreciated.
>
> Thanks.
>
> //////////////////////// BEGIN SPRAY SERVER
>
> import akka.actor.{ActorSystem, Props}
>
>
>
> import akka.io.IO
>
>
>
> import spray.can.Http
>
>
>
>
>
>
>
> import akka.actor._
>
>
>
> import spray.routing.HttpService
>
>
>
> import scala.concurrent.ops
>
>
>
>
>
>
>
> object SprayServer {
>
>
>
>   def main(args: Array[String]) {
>
>
>
>     // we need an ActorSystem to host our service
>
>
>
>     implicit val system = ActorSystem()
>
>
>
>
>
>
>
>     //create our service actor
>
>
>
>     val service = system.actorOf(Props[SprayServiceActor], "test-service")
>
>
>
>
>
>
>
>     //bind our actor to an HTTP port
>
>
>
>     IO(Http) ! Http.Bind(service, interface = "0.0.0.0", port = 8085)
>
>
>
>
>
>
>
>     ops.spawn {
>
>
>
>       *SimpleAppLoader.run() *
>
>
>
>     }
>
>
>
>   }
>
>
>
> }
>
>
>
>
>
>
>
> class SprayServiceActor extends SprayService with Actor {
>
>
>
>   // the HttpService trait (which SprayService will extend) defines
>
>
>
>   // only one abstract member, which connects the services environment
>
>
>
>   // to the enclosing actor or test.
>
>
>
>   def actorRefFactory = context
>
>
>
>
>
>
>
>   def receive = runRoute(rootRoute)
>
>
>
> }
>
>
>
>
>
>
>
> trait SprayService extends HttpService {
>
>
>
>
>
>
>
>   def default = path("") {
>
>
>
>     println("handling default route")
>
>
>
>     val numAs = *SimpleApp.getA()   // DOES NOT WORK   *
>
>
>
>
>     get { complete(s"num A: $numAs") }
>
>
>
>   }
>
>
>
>
>
>
>
>   def pingRoute = path("ping") {
>
>
>
>     get { complete("pong!") }
>
>
>
>   }
>
>
>
>
>
>
>
>   def pongRoute = path("pong") {
>
>
>
>     get { complete("pong!?") }
>
>
>
>   }
>
>
>
>
>
>
>
>   def rootRoute = pingRoute ~ pongRoute ~ default
>
>
>
> }
>
>
>
>
>
>
>
> ////////// END SPRAY, BEGIN SPARK
>
>
>
>
>
>
>
> import org.apache.spark.SparkContext
>
>
>
> import org.apache.spark.SparkContext._
>
>
>
> import org.apache.spark.SparkConf
>
>
>
> import org.apache.spark.deploy.SparkSubmit
>
>
>
> import org.apache.spark.rdd.RDD
>
>
>
>
>
>
>
> object SimpleApp {
>
>
>
>   var resultString: String = "Data not assigned"
>
>
>
>   var logData: RDD[String] = null
>
>
>
>   def main(args: Array[String]) {
>
>
>
>     val logFile = "/home/ovik/src/spark/README.md" // Should be some file
> on your system
>
>
>     val conf = new SparkConf().setAppName("Simple Application")
>
>
>
>     val sc = new SparkContext(conf)
>
>
>
>     logData = sc.textFile(logFile, 2).cache()
>
>
>
>     val numAs = logData.filter(line => line.contains("a")).count()
>
>
>
>     val numBs = logData.filter(line => line.contains("b")).count()
>
>
>
>     resultString = "Lines with a: %s, Lines with b: %s".format(numAs,
> numBs)
>
>
>     println(resultString)
>
>
>
>   }
>
>
>
>   def getA(): Int = {
>
>
>
>     println(resultString)
>
>
>
>     if(null == logData) {
>
>
>
>       println("**** logData is null!")
>
>
>
>       -1
>
>
>
>     } else {
>
>
>
>       val numAs = logData.filter(line => line.contains("a")).count().toInt
>
>
>
>       println(s"**** numAs: $numAs")
>
>
>
>       numAs
>
>
>
>     }
>
>
>
>   }
>
>
>
> }
>
>
>
>
>
>
>
> object SimpleAppLoader {
>
>
>
>   def main(args: Array[String]) {
>
>
>
>     run()
>
>
>
>   }
>
>
>
>
>
>
>
>   def run() {
>
>
>
>
>
>
>
>     val clArgs = Array(
>
>
>
>       "--deploy-mode", "client"
>
>
>
>       , "--total-executor-cores", "2"
>
>
>
>       , "--class", "SimpleApp"
>
>
>
>       , "--conf", "spark.shuffle.spill=false"
>
>
>
>       , "--conf", "spark.master=spark://troika:7077"
>
>
>
>       , "--conf", "spark.driver.memory=128m"
>
>
>
>       , "--conf", "spark.executor.memory=128m"
>
>
>
>       , "--conf", "spark.eventLog.enabled=true"
>
>
>
>       , "--conf", "spark.eventLog.dir=/home/ovik/logs"
>
>
>
>       , SparkContext.jarOfClass(this.getClass).get)
>
>
>
>
>
>
>
>     SparkSubmit.main(clArgs)
>
>
>
>
>
>
>
>     val numAs = *SimpleApp.getA()    // WORKS     *
>
>
>
>
>
>
>
>
>     println(s"numAs is $numAs")
>
>
>
>   }
>
>
>
> }
>
>
>
>
>
>
>

Reply via email to