Thanks for the pointers. The issue was due to route caching by Spray, which
would always return the same value. Other than that the program is working
fine.

On Mon, Jan 5, 2015 at 12:44 AM, Simon Chan <simonc...@gmail.com> wrote:

> Boromir,
>
> You may like to take a look at how we make Spray and Spark working
> together at the PredictionIO project:
> https://github.com/PredictionIO/PredictionIO
>
>
>
> Simon
>
> On Sun, Jan 4, 2015 at 8:31 PM, Chester At Work <ches...@alpinenow.com>
> wrote:
>
>> Just a guess here, may not be correct.
>>
>>   Spray needs to start akka actor system; spark context also creates an
>> akka actor system, is it possible there are some conflict ?
>>
>>
>>
>> Sent from my iPad
>>
>> On Jan 4, 2015, at 7:42 PM, Boromir Widas <vcsub...@gmail.com> wrote:
>>
>> Hello,
>>
>> I am trying to launch a Spark app(client mode for standalone cluster)
>> from a Spray server, using the following code.
>>
>> When I run it as
>>
>> $> java -cp <class paths> SprayServer
>>
>> the SimpleApp.getA() call from  SprayService returns -1(which means it
>> sees the logData RDD as null for HTTP requests), but the statements from
>> within SimpleAppLoader.run() get correct values from SimpleApp.getA().
>>
>> Any idea why the HTTP requests do not see the cached RDD? I have been
>> trying to debug this for some time but not getting anywhere - any pointers
>> will be greatly appreciated.
>>
>> Thanks.
>>
>> //////////////////////// BEGIN SPRAY SERVER
>>
>> import akka.actor.{ActorSystem, Props}
>>
>>
>>
>> import akka.io.IO
>>
>>
>>
>> import spray.can.Http
>>
>>
>>
>>
>>
>>
>>
>> import akka.actor._
>>
>>
>>
>> import spray.routing.HttpService
>>
>>
>>
>> import scala.concurrent.ops
>>
>>
>>
>>
>>
>>
>>
>> object SprayServer {
>>
>>
>>
>>   def main(args: Array[String]) {
>>
>>
>>
>>     // we need an ActorSystem to host our service
>>
>>
>>
>>     implicit val system = ActorSystem()
>>
>>
>>
>>
>>
>>
>>
>>     //create our service actor
>>
>>
>>
>>     val service = system.actorOf(Props[SprayServiceActor],
>> "test-service")
>>
>>
>>
>>
>>
>>
>>
>>     //bind our actor to an HTTP port
>>
>>
>>
>>     IO(Http) ! Http.Bind(service, interface = "0.0.0.0", port = 8085)
>>
>>
>>
>>
>>
>>
>>
>>     ops.spawn {
>>
>>
>>
>>       *SimpleAppLoader.run() *
>>
>>
>>
>>     }
>>
>>
>>
>>   }
>>
>>
>>
>> }
>>
>>
>>
>>
>>
>>
>>
>> class SprayServiceActor extends SprayService with Actor {
>>
>>
>>
>>   // the HttpService trait (which SprayService will extend) defines
>>
>>
>>
>>   // only one abstract member, which connects the services environment
>>
>>
>>
>>   // to the enclosing actor or test.
>>
>>
>>
>>   def actorRefFactory = context
>>
>>
>>
>>
>>
>>
>>
>>   def receive = runRoute(rootRoute)
>>
>>
>>
>> }
>>
>>
>>
>>
>>
>>
>>
>> trait SprayService extends HttpService {
>>
>>
>>
>>
>>
>>
>>
>>   def default = path("") {
>>
>>
>>
>>     println("handling default route")
>>
>>
>>
>>     val numAs = *SimpleApp.getA()   // DOES NOT WORK   *
>>
>>
>>
>>
>>     get { complete(s"num A: $numAs") }
>>
>>
>>
>>   }
>>
>>
>>
>>
>>
>>
>>
>>   def pingRoute = path("ping") {
>>
>>
>>
>>     get { complete("pong!") }
>>
>>
>>
>>   }
>>
>>
>>
>>
>>
>>
>>
>>   def pongRoute = path("pong") {
>>
>>
>>
>>     get { complete("pong!?") }
>>
>>
>>
>>   }
>>
>>
>>
>>
>>
>>
>>
>>   def rootRoute = pingRoute ~ pongRoute ~ default
>>
>>
>>
>> }
>>
>>
>>
>>
>>
>>
>>
>> ////////// END SPRAY, BEGIN SPARK
>>
>>
>>
>>
>>
>>
>>
>> import org.apache.spark.SparkContext
>>
>>
>>
>> import org.apache.spark.SparkContext._
>>
>>
>>
>> import org.apache.spark.SparkConf
>>
>>
>>
>> import org.apache.spark.deploy.SparkSubmit
>>
>>
>>
>> import org.apache.spark.rdd.RDD
>>
>>
>>
>>
>>
>>
>>
>> object SimpleApp {
>>
>>
>>
>>   var resultString: String = "Data not assigned"
>>
>>
>>
>>   var logData: RDD[String] = null
>>
>>
>>
>>   def main(args: Array[String]) {
>>
>>
>>
>>     val logFile = "/home/ovik/src/spark/README.md" // Should be some file
>> on your system
>>
>>
>>     val conf = new SparkConf().setAppName("Simple Application")
>>
>>
>>
>>     val sc = new SparkContext(conf)
>>
>>
>>
>>     logData = sc.textFile(logFile, 2).cache()
>>
>>
>>
>>     val numAs = logData.filter(line => line.contains("a")).count()
>>
>>
>>
>>     val numBs = logData.filter(line => line.contains("b")).count()
>>
>>
>>
>>     resultString = "Lines with a: %s, Lines with b: %s".format(numAs,
>> numBs)
>>
>>
>>     println(resultString)
>>
>>
>>
>>   }
>>
>>
>>
>>   def getA(): Int = {
>>
>>
>>
>>     println(resultString)
>>
>>
>>
>>     if(null == logData) {
>>
>>
>>
>>       println("**** logData is null!")
>>
>>
>>
>>       -1
>>
>>
>>
>>     } else {
>>
>>
>>
>>       val numAs = logData.filter(line =>
>> line.contains("a")).count().toInt
>>
>>
>>
>>       println(s"**** numAs: $numAs")
>>
>>
>>
>>       numAs
>>
>>
>>
>>     }
>>
>>
>>
>>   }
>>
>>
>>
>> }
>>
>>
>>
>>
>>
>>
>>
>> object SimpleAppLoader {
>>
>>
>>
>>   def main(args: Array[String]) {
>>
>>
>>
>>     run()
>>
>>
>>
>>   }
>>
>>
>>
>>
>>
>>
>>
>>   def run() {
>>
>>
>>
>>
>>
>>
>>
>>     val clArgs = Array(
>>
>>
>>
>>       "--deploy-mode", "client"
>>
>>
>>
>>       , "--total-executor-cores", "2"
>>
>>
>>
>>       , "--class", "SimpleApp"
>>
>>
>>
>>       , "--conf", "spark.shuffle.spill=false"
>>
>>
>>
>>       , "--conf", "spark.master=spark://troika:7077"
>>
>>
>>
>>       , "--conf", "spark.driver.memory=128m"
>>
>>
>>
>>       , "--conf", "spark.executor.memory=128m"
>>
>>
>>
>>       , "--conf", "spark.eventLog.enabled=true"
>>
>>
>>
>>       , "--conf", "spark.eventLog.dir=/home/ovik/logs"
>>
>>
>>
>>       , SparkContext.jarOfClass(this.getClass).get)
>>
>>
>>
>>
>>
>>
>>
>>     SparkSubmit.main(clArgs)
>>
>>
>>
>>
>>
>>
>>
>>     val numAs = *SimpleApp.getA()    // WORKS     *
>>
>>
>>
>>
>>
>>
>>
>>
>>     println(s"numAs is $numAs")
>>
>>
>>
>>   }
>>
>>
>>
>> }
>>
>>
>>
>>
>>
>>
>>
>

Reply via email to