Thank you guys for the answers.

@Ted Yu: You are right, in general the code to fetch stuff externally
should be called separately, while Spark should only access the data
written by these two services via flume/kafka/whatever. However, before I
get there, I would like to have the Spark job ready.

@Shixiong Zhu: I imagined something like that, and I must say that I
thought since the beginning that SparkContext could not be called in
Futures in general. It seems that I was right with that assumption,
although I tried and I got the confirmation I needed. Unfortunately, I
don't have a reproducer, but I would say that it's enough to create one
Future and call sparkContext from there.

Thanks again for the answers.

Kind regards,
Marco

2016-01-18 19:37 GMT+01:00 Shixiong(Ryan) Zhu <shixi...@databricks.com>:

> Hey Marco,
>
> Since the codes in Future is in an asynchronous way, you cannot call
> "sparkContext.stop" at the end of "fetch" because the codes in Future may
> not finish.
>
> However, the exception seems weird. Do you have a simple reproducer?
>
>
> On Mon, Jan 18, 2016 at 9:13 AM, Ted Yu <yuzhih...@gmail.com> wrote:
>
>>       externalCallTwo map { dataTwo =>
>>         println("in map") // prints, so it gets here ...
>>         val rddOne = sparkContext.parallelize(dataOne)
>>
>> I don't think you should call method on sparkContext in map function.
>> sparkContext lives on driver side.
>>
>> Cheers
>>
>> On Mon, Jan 18, 2016 at 6:27 AM, Marco <marcu...@gmail.com> wrote:
>>
>>> Hello,
>>>
>>> I am using Spark 1.5.1 within SBT, and Scala 2.10.6 and I am facing an
>>> issue with the SparkContext.
>>>
>>> Basically, I have an object that needs to do several things:
>>>
>>> - call an external service One (web api)
>>> - call an external service Two (another api)
>>> - read and produce an RDD from HDFS (Spark)
>>> - parallelize the data obtained in the first two calls
>>> - join these different rdds, do stuff with them...
>>>
>>> Now, I am trying to do it in an asynchronous way. This doesn't seem to
>>> work, though. My guess is that Spark doesn't see the calls to .parallelize,
>>> as they are made in different tasks (or Future, therefore this code is
>>> called before/later or maybe with an unset Context (can it be?)). I have
>>> tried different ways, one of these being the call to SparkEnv.set in the
>>> calls to flatMap and map (in the Future). However, all I get is Cannot call
>>> methods on a stopped SparkContext. It just doesnt'work - maybe I just
>>> misunderstood what it does, therefore I removed it.
>>>
>>> This is the code I have written so far:
>>>
>>> object Fetcher {
>>>
>>>   def fetch(name, master, ...) = {
>>>     val externalCallOne: Future[WSResponse] = externalService1()
>>>     val externalCallTwo: Future[String] = externalService2()
>>>     // val sparkEnv = SparkEnv.get
>>>     val config = new SparkConf()
>>>     .setAppName(name)
>>>     .set("spark.master", master)
>>>     .set("spark.serializer",
>>> "org.apache.spark.serializer.KryoSerializer")
>>>
>>>     val sparkContext = new SparkContext(config)
>>>     //val sparkEnv = SparkEnv.get
>>>
>>>     val eventuallyJoinedData = externalCallOne flatMap { dataOne =>
>>>       // SparkEnv.set(sparkEnv)
>>>       externalCallTwo map { dataTwo =>
>>>         println("in map") // prints, so it gets here ...
>>>         val rddOne = sparkContext.parallelize(dataOne)
>>>         val rddTwo = sparkContext.parallelize(dataTwo)
>>>         // do stuff here ... foreach/println, and
>>>
>>>         val joinedData = rddOne leftOuterJoin (rddTwo)
>>>       }
>>>     }
>>>     eventuallyJoinedData onSuccess { case success => ...  }
>>>     eventuallyJoinedData onFailure { case error =>
>>> println(error.getMessage) }
>>>     // sparkContext.stop
>>>   }
>>>
>>> }
>>> As you can see, I have also tried to comment the line to stop the
>>> context, but then I get another issue:
>>>
>>> 13:09:14.929 [ForkJoinPool-1-worker-5] INFO
>>>  org.apache.spark.SparkContext - Starting job: count at Fetcher.scala:38
>>> 13:09:14.932 [shuffle-server-0] DEBUG io.netty.channel.nio.NioEventLoop
>>> - Selector.select() returned prematurely because
>>> Thread.currentThread().interrupt() was called. Use
>>> NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.
>>> 13:09:14.936 [Spark Context Cleaner] ERROR
>>> org.apache.spark.ContextCleaner - Error in cleaning thread
>>> java.lang.InterruptedException: null
>>>     at java.lang.Object.wait(Native Method) ~[na:1.8.0_65]
>>>     at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
>>> ~[na:1.8.0_65]
>>>     at
>>> org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:157)
>>> ~[spark-core_2.10-1.5.1.jar:1.5.1]
>>>     at
>>> org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1136)
>>> [spark-core_2.10-1.5.1.jar:1.5.1]
>>>     at 
>>> org.apache.spark.ContextCleaner.org$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:154)
>>> [spark-core_2.10-1.5.1.jar:1.5.1]
>>>     at
>>> org.apache.spark.ContextCleaner$$anon$3.run(ContextCleaner.scala:67)
>>> [spark-core_2.10-1.5.1.jar:1.5.1]
>>> 13:09:14.940 [db-async-netty-thread-1] DEBUG
>>> io.netty.channel.nio.NioEventLoop - Selector.select() returned prematurely
>>> because Thread.currentThread().interrupt() was called. Use
>>> NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.
>>> 13:09:14.943 [SparkListenerBus] ERROR org.apache.spark.util.Utils -
>>> uncaught error in thread SparkListenerBus, stopping SparkContext
>>> java.lang.InterruptedException: null
>>>     at
>>> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:998)
>>> ~[na:1.8.0_65]
>>>     at
>>> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>>> ~[na:1.8.0_65]
>>>     at java.util.concurrent.Semaphore.acquire(Semaphore.java:312)
>>> ~[na:1.8.0_65]
>>>     at
>>> org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(AsynchronousListenerBus.scala:65)
>>> ~[spark-core_2.10-1.5.1.jar:1.5.1]
>>>     at
>>> org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1136)
>>> ~[spark-core_2.10-1.5.1.jar:1.5.1]
>>>     at
>>> org.apache.spark.util.AsynchronousListenerBus$$anon$1.run(AsynchronousListenerBus.scala:63)
>>> [spark-core_2.10-1.5.1.jar:1.5.1]
>>> 13:09:14.949 [SparkListenerBus] DEBUG
>>> o.s.j.u.component.AbstractLifeCycle - stopping
>>> org.spark-project.jetty.server.Server@787cbcef
>>> 13:09:14.959 [SparkListenerBus] DEBUG
>>> o.s.j.u.component.AbstractLifeCycle - stopping
>>> SelectChannelConnector@0.0.0.0:4040
>>> 13:09:14.959 [SparkListenerBus] DEBUG
>>> o.s.j.u.component.AbstractLifeCycle - stopping
>>> org.spark-project.jetty.server.nio.SelectChannelConnector$ConnectorSelectorManager@797cc465
>>> As you can see, it tries to call the count operation on the RDD, but
>>> then it fails (possibly, because the SparkContext is null(?)).
>>>
>>> How do I address this issue? What needs to be done? Do I need to switch
>>> to a synchronous architecture?
>>>
>>> Thanks in advance.
>>>
>>> Kind regards,
>>> Marco
>>>
>>>
>>>
>>
>

Reply via email to