externalCallTwo map { dataTwo =>
        println("in map") // prints, so it gets here ...
        val rddOne = sparkContext.parallelize(dataOne)

I don't think you should call method on sparkContext in map function.
sparkContext lives on driver side.

Cheers

On Mon, Jan 18, 2016 at 6:27 AM, Marco <marcu...@gmail.com> wrote:

> Hello,
>
> I am using Spark 1.5.1 within SBT, and Scala 2.10.6 and I am facing an
> issue with the SparkContext.
>
> Basically, I have an object that needs to do several things:
>
> - call an external service One (web api)
> - call an external service Two (another api)
> - read and produce an RDD from HDFS (Spark)
> - parallelize the data obtained in the first two calls
> - join these different rdds, do stuff with them...
>
> Now, I am trying to do it in an asynchronous way. This doesn't seem to
> work, though. My guess is that Spark doesn't see the calls to .parallelize,
> as they are made in different tasks (or Future, therefore this code is
> called before/later or maybe with an unset Context (can it be?)). I have
> tried different ways, one of these being the call to SparkEnv.set in the
> calls to flatMap and map (in the Future). However, all I get is Cannot call
> methods on a stopped SparkContext. It just doesnt'work - maybe I just
> misunderstood what it does, therefore I removed it.
>
> This is the code I have written so far:
>
> object Fetcher {
>
>   def fetch(name, master, ...) = {
>     val externalCallOne: Future[WSResponse] = externalService1()
>     val externalCallTwo: Future[String] = externalService2()
>     // val sparkEnv = SparkEnv.get
>     val config = new SparkConf()
>     .setAppName(name)
>     .set("spark.master", master)
>     .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
>
>     val sparkContext = new SparkContext(config)
>     //val sparkEnv = SparkEnv.get
>
>     val eventuallyJoinedData = externalCallOne flatMap { dataOne =>
>       // SparkEnv.set(sparkEnv)
>       externalCallTwo map { dataTwo =>
>         println("in map") // prints, so it gets here ...
>         val rddOne = sparkContext.parallelize(dataOne)
>         val rddTwo = sparkContext.parallelize(dataTwo)
>         // do stuff here ... foreach/println, and
>
>         val joinedData = rddOne leftOuterJoin (rddTwo)
>       }
>     }
>     eventuallyJoinedData onSuccess { case success => ...  }
>     eventuallyJoinedData onFailure { case error =>
> println(error.getMessage) }
>     // sparkContext.stop
>   }
>
> }
> As you can see, I have also tried to comment the line to stop the context,
> but then I get another issue:
>
> 13:09:14.929 [ForkJoinPool-1-worker-5] INFO  org.apache.spark.SparkContext
> - Starting job: count at Fetcher.scala:38
> 13:09:14.932 [shuffle-server-0] DEBUG io.netty.channel.nio.NioEventLoop -
> Selector.select() returned prematurely because
> Thread.currentThread().interrupt() was called. Use
> NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.
> 13:09:14.936 [Spark Context Cleaner] ERROR org.apache.spark.ContextCleaner
> - Error in cleaning thread
> java.lang.InterruptedException: null
>     at java.lang.Object.wait(Native Method) ~[na:1.8.0_65]
>     at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
> ~[na:1.8.0_65]
>     at
> org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:157)
> ~[spark-core_2.10-1.5.1.jar:1.5.1]
>     at
> org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1136)
> [spark-core_2.10-1.5.1.jar:1.5.1]
>     at 
> org.apache.spark.ContextCleaner.org$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:154)
> [spark-core_2.10-1.5.1.jar:1.5.1]
>     at
> org.apache.spark.ContextCleaner$$anon$3.run(ContextCleaner.scala:67)
> [spark-core_2.10-1.5.1.jar:1.5.1]
> 13:09:14.940 [db-async-netty-thread-1] DEBUG
> io.netty.channel.nio.NioEventLoop - Selector.select() returned prematurely
> because Thread.currentThread().interrupt() was called. Use
> NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.
> 13:09:14.943 [SparkListenerBus] ERROR org.apache.spark.util.Utils -
> uncaught error in thread SparkListenerBus, stopping SparkContext
> java.lang.InterruptedException: null
>     at
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:998)
> ~[na:1.8.0_65]
>     at
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
> ~[na:1.8.0_65]
>     at java.util.concurrent.Semaphore.acquire(Semaphore.java:312)
> ~[na:1.8.0_65]
>     at
> org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(AsynchronousListenerBus.scala:65)
> ~[spark-core_2.10-1.5.1.jar:1.5.1]
>     at
> org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1136)
> ~[spark-core_2.10-1.5.1.jar:1.5.1]
>     at
> org.apache.spark.util.AsynchronousListenerBus$$anon$1.run(AsynchronousListenerBus.scala:63)
> [spark-core_2.10-1.5.1.jar:1.5.1]
> 13:09:14.949 [SparkListenerBus] DEBUG o.s.j.u.component.AbstractLifeCycle
> - stopping org.spark-project.jetty.server.Server@787cbcef
> 13:09:14.959 [SparkListenerBus] DEBUG o.s.j.u.component.AbstractLifeCycle
> - stopping SelectChannelConnector@0.0.0.0:4040
> 13:09:14.959 [SparkListenerBus] DEBUG o.s.j.u.component.AbstractLifeCycle
> - stopping
> org.spark-project.jetty.server.nio.SelectChannelConnector$ConnectorSelectorManager@797cc465
> As you can see, it tries to call the count operation on the RDD, but then
> it fails (possibly, because the SparkContext is null(?)).
>
> How do I address this issue? What needs to be done? Do I need to switch to
> a synchronous architecture?
>
> Thanks in advance.
>
> Kind regards,
> Marco
>
>
>

Reply via email to