Thank you guys for the answers. @Ted Yu: You are right, in general the code to fetch stuff externally should be called separately, while Spark should only access the data written by these two services via flume/kafka/whatever. However, before I get there, I would like to have the Spark job ready.
@Shixiong Zhu: I imagined something like that, and I must say that I thought since the beginning that SparkContext could not be called in Futures in general. It seems that I was right with that assumption, although I tried and I got the confirmation I needed. Unfortunately, I don't have a reproducer, but I would say that it's enough to create one Future and call sparkContext from there. Thanks again for the answers. Kind regards, Marco 2016-01-18 19:37 GMT+01:00 Shixiong(Ryan) Zhu <shixi...@databricks.com>: > Hey Marco, > > Since the codes in Future is in an asynchronous way, you cannot call > "sparkContext.stop" at the end of "fetch" because the codes in Future may > not finish. > > However, the exception seems weird. Do you have a simple reproducer? > > > On Mon, Jan 18, 2016 at 9:13 AM, Ted Yu <yuzhih...@gmail.com> wrote: > >> externalCallTwo map { dataTwo => >> println("in map") // prints, so it gets here ... >> val rddOne = sparkContext.parallelize(dataOne) >> >> I don't think you should call method on sparkContext in map function. >> sparkContext lives on driver side. >> >> Cheers >> >> On Mon, Jan 18, 2016 at 6:27 AM, Marco <marcu...@gmail.com> wrote: >> >>> Hello, >>> >>> I am using Spark 1.5.1 within SBT, and Scala 2.10.6 and I am facing an >>> issue with the SparkContext. >>> >>> Basically, I have an object that needs to do several things: >>> >>> - call an external service One (web api) >>> - call an external service Two (another api) >>> - read and produce an RDD from HDFS (Spark) >>> - parallelize the data obtained in the first two calls >>> - join these different rdds, do stuff with them... >>> >>> Now, I am trying to do it in an asynchronous way. This doesn't seem to >>> work, though. My guess is that Spark doesn't see the calls to .parallelize, >>> as they are made in different tasks (or Future, therefore this code is >>> called before/later or maybe with an unset Context (can it be?)). I have >>> tried different ways, one of these being the call to SparkEnv.set in the >>> calls to flatMap and map (in the Future). However, all I get is Cannot call >>> methods on a stopped SparkContext. It just doesnt'work - maybe I just >>> misunderstood what it does, therefore I removed it. >>> >>> This is the code I have written so far: >>> >>> object Fetcher { >>> >>> def fetch(name, master, ...) = { >>> val externalCallOne: Future[WSResponse] = externalService1() >>> val externalCallTwo: Future[String] = externalService2() >>> // val sparkEnv = SparkEnv.get >>> val config = new SparkConf() >>> .setAppName(name) >>> .set("spark.master", master) >>> .set("spark.serializer", >>> "org.apache.spark.serializer.KryoSerializer") >>> >>> val sparkContext = new SparkContext(config) >>> //val sparkEnv = SparkEnv.get >>> >>> val eventuallyJoinedData = externalCallOne flatMap { dataOne => >>> // SparkEnv.set(sparkEnv) >>> externalCallTwo map { dataTwo => >>> println("in map") // prints, so it gets here ... >>> val rddOne = sparkContext.parallelize(dataOne) >>> val rddTwo = sparkContext.parallelize(dataTwo) >>> // do stuff here ... foreach/println, and >>> >>> val joinedData = rddOne leftOuterJoin (rddTwo) >>> } >>> } >>> eventuallyJoinedData onSuccess { case success => ... } >>> eventuallyJoinedData onFailure { case error => >>> println(error.getMessage) } >>> // sparkContext.stop >>> } >>> >>> } >>> As you can see, I have also tried to comment the line to stop the >>> context, but then I get another issue: >>> >>> 13:09:14.929 [ForkJoinPool-1-worker-5] INFO >>> org.apache.spark.SparkContext - Starting job: count at Fetcher.scala:38 >>> 13:09:14.932 [shuffle-server-0] DEBUG io.netty.channel.nio.NioEventLoop >>> - Selector.select() returned prematurely because >>> Thread.currentThread().interrupt() was called. Use >>> NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop. >>> 13:09:14.936 [Spark Context Cleaner] ERROR >>> org.apache.spark.ContextCleaner - Error in cleaning thread >>> java.lang.InterruptedException: null >>> at java.lang.Object.wait(Native Method) ~[na:1.8.0_65] >>> at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143) >>> ~[na:1.8.0_65] >>> at >>> org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:157) >>> ~[spark-core_2.10-1.5.1.jar:1.5.1] >>> at >>> org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1136) >>> [spark-core_2.10-1.5.1.jar:1.5.1] >>> at >>> org.apache.spark.ContextCleaner.org$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:154) >>> [spark-core_2.10-1.5.1.jar:1.5.1] >>> at >>> org.apache.spark.ContextCleaner$$anon$3.run(ContextCleaner.scala:67) >>> [spark-core_2.10-1.5.1.jar:1.5.1] >>> 13:09:14.940 [db-async-netty-thread-1] DEBUG >>> io.netty.channel.nio.NioEventLoop - Selector.select() returned prematurely >>> because Thread.currentThread().interrupt() was called. Use >>> NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop. >>> 13:09:14.943 [SparkListenerBus] ERROR org.apache.spark.util.Utils - >>> uncaught error in thread SparkListenerBus, stopping SparkContext >>> java.lang.InterruptedException: null >>> at >>> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:998) >>> ~[na:1.8.0_65] >>> at >>> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) >>> ~[na:1.8.0_65] >>> at java.util.concurrent.Semaphore.acquire(Semaphore.java:312) >>> ~[na:1.8.0_65] >>> at >>> org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(AsynchronousListenerBus.scala:65) >>> ~[spark-core_2.10-1.5.1.jar:1.5.1] >>> at >>> org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1136) >>> ~[spark-core_2.10-1.5.1.jar:1.5.1] >>> at >>> org.apache.spark.util.AsynchronousListenerBus$$anon$1.run(AsynchronousListenerBus.scala:63) >>> [spark-core_2.10-1.5.1.jar:1.5.1] >>> 13:09:14.949 [SparkListenerBus] DEBUG >>> o.s.j.u.component.AbstractLifeCycle - stopping >>> org.spark-project.jetty.server.Server@787cbcef >>> 13:09:14.959 [SparkListenerBus] DEBUG >>> o.s.j.u.component.AbstractLifeCycle - stopping >>> SelectChannelConnector@0.0.0.0:4040 >>> 13:09:14.959 [SparkListenerBus] DEBUG >>> o.s.j.u.component.AbstractLifeCycle - stopping >>> org.spark-project.jetty.server.nio.SelectChannelConnector$ConnectorSelectorManager@797cc465 >>> As you can see, it tries to call the count operation on the RDD, but >>> then it fails (possibly, because the SparkContext is null(?)). >>> >>> How do I address this issue? What needs to be done? Do I need to switch >>> to a synchronous architecture? >>> >>> Thanks in advance. >>> >>> Kind regards, >>> Marco >>> >>> >>> >> >