externalCallTwo map { dataTwo => println("in map") // prints, so it gets here ... val rddOne = sparkContext.parallelize(dataOne)
I don't think you should call method on sparkContext in map function. sparkContext lives on driver side. Cheers On Mon, Jan 18, 2016 at 6:27 AM, Marco <marcu...@gmail.com> wrote: > Hello, > > I am using Spark 1.5.1 within SBT, and Scala 2.10.6 and I am facing an > issue with the SparkContext. > > Basically, I have an object that needs to do several things: > > - call an external service One (web api) > - call an external service Two (another api) > - read and produce an RDD from HDFS (Spark) > - parallelize the data obtained in the first two calls > - join these different rdds, do stuff with them... > > Now, I am trying to do it in an asynchronous way. This doesn't seem to > work, though. My guess is that Spark doesn't see the calls to .parallelize, > as they are made in different tasks (or Future, therefore this code is > called before/later or maybe with an unset Context (can it be?)). I have > tried different ways, one of these being the call to SparkEnv.set in the > calls to flatMap and map (in the Future). However, all I get is Cannot call > methods on a stopped SparkContext. It just doesnt'work - maybe I just > misunderstood what it does, therefore I removed it. > > This is the code I have written so far: > > object Fetcher { > > def fetch(name, master, ...) = { > val externalCallOne: Future[WSResponse] = externalService1() > val externalCallTwo: Future[String] = externalService2() > // val sparkEnv = SparkEnv.get > val config = new SparkConf() > .setAppName(name) > .set("spark.master", master) > .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") > > val sparkContext = new SparkContext(config) > //val sparkEnv = SparkEnv.get > > val eventuallyJoinedData = externalCallOne flatMap { dataOne => > // SparkEnv.set(sparkEnv) > externalCallTwo map { dataTwo => > println("in map") // prints, so it gets here ... > val rddOne = sparkContext.parallelize(dataOne) > val rddTwo = sparkContext.parallelize(dataTwo) > // do stuff here ... foreach/println, and > > val joinedData = rddOne leftOuterJoin (rddTwo) > } > } > eventuallyJoinedData onSuccess { case success => ... } > eventuallyJoinedData onFailure { case error => > println(error.getMessage) } > // sparkContext.stop > } > > } > As you can see, I have also tried to comment the line to stop the context, > but then I get another issue: > > 13:09:14.929 [ForkJoinPool-1-worker-5] INFO org.apache.spark.SparkContext > - Starting job: count at Fetcher.scala:38 > 13:09:14.932 [shuffle-server-0] DEBUG io.netty.channel.nio.NioEventLoop - > Selector.select() returned prematurely because > Thread.currentThread().interrupt() was called. Use > NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop. > 13:09:14.936 [Spark Context Cleaner] ERROR org.apache.spark.ContextCleaner > - Error in cleaning thread > java.lang.InterruptedException: null > at java.lang.Object.wait(Native Method) ~[na:1.8.0_65] > at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143) > ~[na:1.8.0_65] > at > org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:157) > ~[spark-core_2.10-1.5.1.jar:1.5.1] > at > org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1136) > [spark-core_2.10-1.5.1.jar:1.5.1] > at > org.apache.spark.ContextCleaner.org$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:154) > [spark-core_2.10-1.5.1.jar:1.5.1] > at > org.apache.spark.ContextCleaner$$anon$3.run(ContextCleaner.scala:67) > [spark-core_2.10-1.5.1.jar:1.5.1] > 13:09:14.940 [db-async-netty-thread-1] DEBUG > io.netty.channel.nio.NioEventLoop - Selector.select() returned prematurely > because Thread.currentThread().interrupt() was called. Use > NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop. > 13:09:14.943 [SparkListenerBus] ERROR org.apache.spark.util.Utils - > uncaught error in thread SparkListenerBus, stopping SparkContext > java.lang.InterruptedException: null > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:998) > ~[na:1.8.0_65] > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > ~[na:1.8.0_65] > at java.util.concurrent.Semaphore.acquire(Semaphore.java:312) > ~[na:1.8.0_65] > at > org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(AsynchronousListenerBus.scala:65) > ~[spark-core_2.10-1.5.1.jar:1.5.1] > at > org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1136) > ~[spark-core_2.10-1.5.1.jar:1.5.1] > at > org.apache.spark.util.AsynchronousListenerBus$$anon$1.run(AsynchronousListenerBus.scala:63) > [spark-core_2.10-1.5.1.jar:1.5.1] > 13:09:14.949 [SparkListenerBus] DEBUG o.s.j.u.component.AbstractLifeCycle > - stopping org.spark-project.jetty.server.Server@787cbcef > 13:09:14.959 [SparkListenerBus] DEBUG o.s.j.u.component.AbstractLifeCycle > - stopping SelectChannelConnector@0.0.0.0:4040 > 13:09:14.959 [SparkListenerBus] DEBUG o.s.j.u.component.AbstractLifeCycle > - stopping > org.spark-project.jetty.server.nio.SelectChannelConnector$ConnectorSelectorManager@797cc465 > As you can see, it tries to call the count operation on the RDD, but then > it fails (possibly, because the SparkContext is null(?)). > > How do I address this issue? What needs to be done? Do I need to switch to > a synchronous architecture? > > Thanks in advance. > > Kind regards, > Marco > > >