Hey Marco,

Since the codes in Future is in an asynchronous way, you cannot call
"sparkContext.stop" at the end of "fetch" because the codes in Future may
not finish.

However, the exception seems weird. Do you have a simple reproducer?


On Mon, Jan 18, 2016 at 9:13 AM, Ted Yu <yuzhih...@gmail.com> wrote:

>       externalCallTwo map { dataTwo =>
>         println("in map") // prints, so it gets here ...
>         val rddOne = sparkContext.parallelize(dataOne)
>
> I don't think you should call method on sparkContext in map function.
> sparkContext lives on driver side.
>
> Cheers
>
> On Mon, Jan 18, 2016 at 6:27 AM, Marco <marcu...@gmail.com> wrote:
>
>> Hello,
>>
>> I am using Spark 1.5.1 within SBT, and Scala 2.10.6 and I am facing an
>> issue with the SparkContext.
>>
>> Basically, I have an object that needs to do several things:
>>
>> - call an external service One (web api)
>> - call an external service Two (another api)
>> - read and produce an RDD from HDFS (Spark)
>> - parallelize the data obtained in the first two calls
>> - join these different rdds, do stuff with them...
>>
>> Now, I am trying to do it in an asynchronous way. This doesn't seem to
>> work, though. My guess is that Spark doesn't see the calls to .parallelize,
>> as they are made in different tasks (or Future, therefore this code is
>> called before/later or maybe with an unset Context (can it be?)). I have
>> tried different ways, one of these being the call to SparkEnv.set in the
>> calls to flatMap and map (in the Future). However, all I get is Cannot call
>> methods on a stopped SparkContext. It just doesnt'work - maybe I just
>> misunderstood what it does, therefore I removed it.
>>
>> This is the code I have written so far:
>>
>> object Fetcher {
>>
>>   def fetch(name, master, ...) = {
>>     val externalCallOne: Future[WSResponse] = externalService1()
>>     val externalCallTwo: Future[String] = externalService2()
>>     // val sparkEnv = SparkEnv.get
>>     val config = new SparkConf()
>>     .setAppName(name)
>>     .set("spark.master", master)
>>     .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
>>
>>     val sparkContext = new SparkContext(config)
>>     //val sparkEnv = SparkEnv.get
>>
>>     val eventuallyJoinedData = externalCallOne flatMap { dataOne =>
>>       // SparkEnv.set(sparkEnv)
>>       externalCallTwo map { dataTwo =>
>>         println("in map") // prints, so it gets here ...
>>         val rddOne = sparkContext.parallelize(dataOne)
>>         val rddTwo = sparkContext.parallelize(dataTwo)
>>         // do stuff here ... foreach/println, and
>>
>>         val joinedData = rddOne leftOuterJoin (rddTwo)
>>       }
>>     }
>>     eventuallyJoinedData onSuccess { case success => ...  }
>>     eventuallyJoinedData onFailure { case error =>
>> println(error.getMessage) }
>>     // sparkContext.stop
>>   }
>>
>> }
>> As you can see, I have also tried to comment the line to stop the
>> context, but then I get another issue:
>>
>> 13:09:14.929 [ForkJoinPool-1-worker-5] INFO
>>  org.apache.spark.SparkContext - Starting job: count at Fetcher.scala:38
>> 13:09:14.932 [shuffle-server-0] DEBUG io.netty.channel.nio.NioEventLoop -
>> Selector.select() returned prematurely because
>> Thread.currentThread().interrupt() was called. Use
>> NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.
>> 13:09:14.936 [Spark Context Cleaner] ERROR
>> org.apache.spark.ContextCleaner - Error in cleaning thread
>> java.lang.InterruptedException: null
>>     at java.lang.Object.wait(Native Method) ~[na:1.8.0_65]
>>     at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
>> ~[na:1.8.0_65]
>>     at
>> org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:157)
>> ~[spark-core_2.10-1.5.1.jar:1.5.1]
>>     at
>> org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1136)
>> [spark-core_2.10-1.5.1.jar:1.5.1]
>>     at 
>> org.apache.spark.ContextCleaner.org$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:154)
>> [spark-core_2.10-1.5.1.jar:1.5.1]
>>     at
>> org.apache.spark.ContextCleaner$$anon$3.run(ContextCleaner.scala:67)
>> [spark-core_2.10-1.5.1.jar:1.5.1]
>> 13:09:14.940 [db-async-netty-thread-1] DEBUG
>> io.netty.channel.nio.NioEventLoop - Selector.select() returned prematurely
>> because Thread.currentThread().interrupt() was called. Use
>> NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.
>> 13:09:14.943 [SparkListenerBus] ERROR org.apache.spark.util.Utils -
>> uncaught error in thread SparkListenerBus, stopping SparkContext
>> java.lang.InterruptedException: null
>>     at
>> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:998)
>> ~[na:1.8.0_65]
>>     at
>> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>> ~[na:1.8.0_65]
>>     at java.util.concurrent.Semaphore.acquire(Semaphore.java:312)
>> ~[na:1.8.0_65]
>>     at
>> org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(AsynchronousListenerBus.scala:65)
>> ~[spark-core_2.10-1.5.1.jar:1.5.1]
>>     at
>> org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1136)
>> ~[spark-core_2.10-1.5.1.jar:1.5.1]
>>     at
>> org.apache.spark.util.AsynchronousListenerBus$$anon$1.run(AsynchronousListenerBus.scala:63)
>> [spark-core_2.10-1.5.1.jar:1.5.1]
>> 13:09:14.949 [SparkListenerBus] DEBUG o.s.j.u.component.AbstractLifeCycle
>> - stopping org.spark-project.jetty.server.Server@787cbcef
>> 13:09:14.959 [SparkListenerBus] DEBUG o.s.j.u.component.AbstractLifeCycle
>> - stopping SelectChannelConnector@0.0.0.0:4040
>> 13:09:14.959 [SparkListenerBus] DEBUG o.s.j.u.component.AbstractLifeCycle
>> - stopping
>> org.spark-project.jetty.server.nio.SelectChannelConnector$ConnectorSelectorManager@797cc465
>> As you can see, it tries to call the count operation on the RDD, but then
>> it fails (possibly, because the SparkContext is null(?)).
>>
>> How do I address this issue? What needs to be done? Do I need to switch
>> to a synchronous architecture?
>>
>> Thanks in advance.
>>
>> Kind regards,
>> Marco
>>
>>
>>
>

Reply via email to