I am using Spark 1.5.1 within SBT, and Scala 2.10.6 and I am writing because
I am facing an issue with the SparkContext. 

Basically, I have an object that needs to do several things:

- call an external service One (web api)
- call an external service Two (another api)
- read and produce an RDD from HDFS (Spark)
- parallelize the data obtained in the first two calls
- join these different rdds, do stuff with them...

Now, I am trying to do it in an asynchronous way. This doesn't seem to work,
though. My guess is that Spark doesn't see the calls to .parallelize, as
they are made in different tasks (or Future, therefore this code is called
before/later or maybe with an unset Context (can it be?)). I have tried
different ways, one of these being the call to SparkEnv.set in the calls to
flatMap and map (in the Future). However, all I get is Cannot call methods
on a stopped SparkContext. It just doesnt'work - maybe I just misunderstood
what it does, therefore I removed it.

This is the code I have written so far:

object Fetcher { 

  def fetch(name, master, ...) = { 
    val externalCallOne: Future[WSResponse] = externalService1()
    val externalCallTwo: Future[String] = externalService2()
    // val sparkEnv = SparkEnv.get
    val config = new SparkConf()
    .setAppName(name)
    .set("spark.master", master)
    .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

    val sparkContext = new SparkContext(config)
    //val sparkEnv = SparkEnv.get

    val eventuallyJoinedData = externalCallOne flatMap { dataOne =>
      // SparkEnv.set(sparkEnv)
      externalCallTwo map { dataTwo =>
        println("in map") // prints, so it gets here ...
        val rddOne = sparkContext.parallelize(dataOne)
        val rddTwo = sparkContext.parallelize(dataTwo)
        // do stuff here ... foreach/println, and 

        val joinedData = rddOne leftOuterJoin (rddTwo)
      }
    } 
    eventuallyJoinedData onSuccess { case success => ...  }
    eventuallyJoinedData onFailure { case error => println(error.getMessage)
} 
    // sparkContext.stop 
  } 

}
As you can see, I have also tried to comment the line to stop the context,
but then I get another issue:

13:09:14.929 [ForkJoinPool-1-worker-5] INFO  org.apache.spark.SparkContext -
Starting job: count at Fetcher.scala:38
13:09:14.932 [shuffle-server-0] DEBUG io.netty.channel.nio.NioEventLoop -
Selector.select() returned prematurely because
Thread.currentThread().interrupt() was called. Use
NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.
13:09:14.936 [Spark Context Cleaner] ERROR org.apache.spark.ContextCleaner -
Error in cleaning thread
java.lang.InterruptedException: null
    at java.lang.Object.wait(Native Method) ~[na:1.8.0_65]
    at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
~[na:1.8.0_65]
    at
org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:157)
~[spark-core_2.10-1.5.1.jar:1.5.1]
    at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1136)
[spark-core_2.10-1.5.1.jar:1.5.1]
    at
org.apache.spark.ContextCleaner.org$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:154)
[spark-core_2.10-1.5.1.jar:1.5.1]
    at org.apache.spark.ContextCleaner$$anon$3.run(ContextCleaner.scala:67)
[spark-core_2.10-1.5.1.jar:1.5.1]
13:09:14.940 [db-async-netty-thread-1] DEBUG
io.netty.channel.nio.NioEventLoop - Selector.select() returned prematurely
because Thread.currentThread().interrupt() was called. Use
NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.
13:09:14.943 [SparkListenerBus] ERROR org.apache.spark.util.Utils - uncaught
error in thread SparkListenerBus, stopping SparkContext
java.lang.InterruptedException: null
    at
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:998)
~[na:1.8.0_65]
    at
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
~[na:1.8.0_65]
    at java.util.concurrent.Semaphore.acquire(Semaphore.java:312)
~[na:1.8.0_65]
    at
org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(AsynchronousListenerBus.scala:65)
~[spark-core_2.10-1.5.1.jar:1.5.1]
    at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1136)
~[spark-core_2.10-1.5.1.jar:1.5.1]
    at
org.apache.spark.util.AsynchronousListenerBus$$anon$1.run(AsynchronousListenerBus.scala:63)
[spark-core_2.10-1.5.1.jar:1.5.1]
13:09:14.949 [SparkListenerBus] DEBUG o.s.j.u.component.AbstractLifeCycle -
stopping org.spark-project.jetty.server.Server@787cbcef
13:09:14.959 [SparkListenerBus] DEBUG o.s.j.u.component.AbstractLifeCycle -
stopping SelectChannelConnector@0.0.0.0:4040
13:09:14.959 [SparkListenerBus] DEBUG o.s.j.u.component.AbstractLifeCycle -
stopping
org.spark-project.jetty.server.nio.SelectChannelConnector$ConnectorSelectorManager@797cc465
As you can see, it tries to call the count operation on the RDD, but then it
fails (possibly, because the SparkContext is null(?)).

How do I address this issue? What needs to be done? Do I need to switch to a
synchronous architecture?

Thanks in advance.

Kind regards,
Marco 

PS: I apologize if I have sent this message already to the mailing list, but
I have right now found that it's possible to use Nabble also to write
(normally, I use to read). I am not entirely sure how all this is handled,
but I see many topics not approved yet in the mailing list - so I suppose
this is the "way". If not, please just delete this message.  



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Calling-SparkContext-methods-in-scala-Future-tp25992.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to