Calling SparkContext methods in scala Future

2016-01-18 Thread Marco
Hello,

I am using Spark 1.5.1 within SBT, and Scala 2.10.6 and I am facing an
issue with the SparkContext.

Basically, I have an object that needs to do several things:

- call an external service One (web api)
- call an external service Two (another api)
- read and produce an RDD from HDFS (Spark)
- parallelize the data obtained in the first two calls
- join these different rdds, do stuff with them...

Now, I am trying to do it in an asynchronous way. This doesn't seem to
work, though. My guess is that Spark doesn't see the calls to .parallelize,
as they are made in different tasks (or Future, therefore this code is
called before/later or maybe with an unset Context (can it be?)). I have
tried different ways, one of these being the call to SparkEnv.set in the
calls to flatMap and map (in the Future). However, all I get is Cannot call
methods on a stopped SparkContext. It just doesnt'work - maybe I just
misunderstood what it does, therefore I removed it.

This is the code I have written so far:

object Fetcher {

  def fetch(name, master, ...) = {
val externalCallOne: Future[WSResponse] = externalService1()
val externalCallTwo: Future[String] = externalService2()
// val sparkEnv = SparkEnv.get
val config = new SparkConf()
.setAppName(name)
.set("spark.master", master)
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

val sparkContext = new SparkContext(config)
//val sparkEnv = SparkEnv.get

val eventuallyJoinedData = externalCallOne flatMap { dataOne =>
  // SparkEnv.set(sparkEnv)
  externalCallTwo map { dataTwo =>
println("in map") // prints, so it gets here ...
val rddOne = sparkContext.parallelize(dataOne)
val rddTwo = sparkContext.parallelize(dataTwo)
// do stuff here ... foreach/println, and

val joinedData = rddOne leftOuterJoin (rddTwo)
  }
}
eventuallyJoinedData onSuccess { case success => ...  }
eventuallyJoinedData onFailure { case error =>
println(error.getMessage) }
// sparkContext.stop
  }

}
As you can see, I have also tried to comment the line to stop the context,
but then I get another issue:

13:09:14.929 [ForkJoinPool-1-worker-5] INFO  org.apache.spark.SparkContext
- Starting job: count at Fetcher.scala:38
13:09:14.932 [shuffle-server-0] DEBUG io.netty.channel.nio.NioEventLoop -
Selector.select() returned prematurely because
Thread.currentThread().interrupt() was called. Use
NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.
13:09:14.936 [Spark Context Cleaner] ERROR org.apache.spark.ContextCleaner
- Error in cleaning thread
java.lang.InterruptedException: null
at java.lang.Object.wait(Native Method) ~[na:1.8.0_65]
at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
~[na:1.8.0_65]
at
org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:157)
~[spark-core_2.10-1.5.1.jar:1.5.1]
at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1136)
[spark-core_2.10-1.5.1.jar:1.5.1]
at 
org.apache.spark.ContextCleaner.org$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:154)
[spark-core_2.10-1.5.1.jar:1.5.1]
at org.apache.spark.ContextCleaner$$anon$3.run(ContextCleaner.scala:67)
[spark-core_2.10-1.5.1.jar:1.5.1]
13:09:14.940 [db-async-netty-thread-1] DEBUG
io.netty.channel.nio.NioEventLoop - Selector.select() returned prematurely
because Thread.currentThread().interrupt() was called. Use
NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.
13:09:14.943 [SparkListenerBus] ERROR org.apache.spark.util.Utils -
uncaught error in thread SparkListenerBus, stopping SparkContext
java.lang.InterruptedException: null
at
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:998)
~[na:1.8.0_65]
at
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
~[na:1.8.0_65]
at java.util.concurrent.Semaphore.acquire(Semaphore.java:312)
~[na:1.8.0_65]
at
org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(AsynchronousListenerBus.scala:65)
~[spark-core_2.10-1.5.1.jar:1.5.1]
at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1136)
~[spark-core_2.10-1.5.1.jar:1.5.1]
at
org.apache.spark.util.AsynchronousListenerBus$$anon$1.run(AsynchronousListenerBus.scala:63)
[spark-core_2.10-1.5.1.jar:1.5.1]
13:09:14.949 [SparkListenerBus] DEBUG o.s.j.u.component.AbstractLifeCycle -
stopping org.spark-project.jetty.server.Server@787cbcef
13:09:14.959 [SparkListenerBus] DEBUG o.s.j.u.component.AbstractLifeCycle -
stopping SelectChannelConnector@0.0.0.0:4040
13:09:14.959 [SparkListenerBus] DEBUG o.s.j.u.component.AbstractLifeCycle -
stopping
org.spark-project.jetty.server.nio.SelectChannelConnector$ConnectorSelectorManager@797cc465
As you can see, it tries to call the count o

Calling SparkContext methods in scala Future

2016-01-18 Thread makronized
annelConnector$ConnectorSelectorManager@797cc465
As you can see, it tries to call the count operation on the RDD, but then it
fails (possibly, because the SparkContext is null(?)).

How do I address this issue? What needs to be done? Do I need to switch to a
synchronous architecture?

Thanks in advance.

Kind regards,
Marco 

PS: I apologize if I have sent this message already to the mailing list, but
I have right now found that it's possible to use Nabble also to write
(normally, I use to read). I am not entirely sure how all this is handled,
but I see many topics not approved yet in the mailing list - so I suppose
this is the "way". If not, please just delete this message.  



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Calling-SparkContext-methods-in-scala-Future-tp25992.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Calling SparkContext methods in scala Future

2016-01-18 Thread Ted Yu
  externalCallTwo map { dataTwo =>
println("in map") // prints, so it gets here ...
val rddOne = sparkContext.parallelize(dataOne)

I don't think you should call method on sparkContext in map function.
sparkContext lives on driver side.

Cheers

On Mon, Jan 18, 2016 at 6:27 AM, Marco  wrote:

> Hello,
>
> I am using Spark 1.5.1 within SBT, and Scala 2.10.6 and I am facing an
> issue with the SparkContext.
>
> Basically, I have an object that needs to do several things:
>
> - call an external service One (web api)
> - call an external service Two (another api)
> - read and produce an RDD from HDFS (Spark)
> - parallelize the data obtained in the first two calls
> - join these different rdds, do stuff with them...
>
> Now, I am trying to do it in an asynchronous way. This doesn't seem to
> work, though. My guess is that Spark doesn't see the calls to .parallelize,
> as they are made in different tasks (or Future, therefore this code is
> called before/later or maybe with an unset Context (can it be?)). I have
> tried different ways, one of these being the call to SparkEnv.set in the
> calls to flatMap and map (in the Future). However, all I get is Cannot call
> methods on a stopped SparkContext. It just doesnt'work - maybe I just
> misunderstood what it does, therefore I removed it.
>
> This is the code I have written so far:
>
> object Fetcher {
>
>   def fetch(name, master, ...) = {
> val externalCallOne: Future[WSResponse] = externalService1()
> val externalCallTwo: Future[String] = externalService2()
> // val sparkEnv = SparkEnv.get
> val config = new SparkConf()
> .setAppName(name)
> .set("spark.master", master)
> .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
>
> val sparkContext = new SparkContext(config)
> //val sparkEnv = SparkEnv.get
>
> val eventuallyJoinedData = externalCallOne flatMap { dataOne =>
>   // SparkEnv.set(sparkEnv)
>   externalCallTwo map { dataTwo =>
> println("in map") // prints, so it gets here ...
> val rddOne = sparkContext.parallelize(dataOne)
> val rddTwo = sparkContext.parallelize(dataTwo)
> // do stuff here ... foreach/println, and
>
> val joinedData = rddOne leftOuterJoin (rddTwo)
>   }
> }
> eventuallyJoinedData onSuccess { case success => ...  }
> eventuallyJoinedData onFailure { case error =>
> println(error.getMessage) }
> // sparkContext.stop
>   }
>
> }
> As you can see, I have also tried to comment the line to stop the context,
> but then I get another issue:
>
> 13:09:14.929 [ForkJoinPool-1-worker-5] INFO  org.apache.spark.SparkContext
> - Starting job: count at Fetcher.scala:38
> 13:09:14.932 [shuffle-server-0] DEBUG io.netty.channel.nio.NioEventLoop -
> Selector.select() returned prematurely because
> Thread.currentThread().interrupt() was called. Use
> NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.
> 13:09:14.936 [Spark Context Cleaner] ERROR org.apache.spark.ContextCleaner
> - Error in cleaning thread
> java.lang.InterruptedException: null
> at java.lang.Object.wait(Native Method) ~[na:1.8.0_65]
> at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
> ~[na:1.8.0_65]
> at
> org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:157)
> ~[spark-core_2.10-1.5.1.jar:1.5.1]
> at
> org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1136)
> [spark-core_2.10-1.5.1.jar:1.5.1]
> at 
> org.apache.spark.ContextCleaner.org$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:154)
> [spark-core_2.10-1.5.1.jar:1.5.1]
> at
> org.apache.spark.ContextCleaner$$anon$3.run(ContextCleaner.scala:67)
> [spark-core_2.10-1.5.1.jar:1.5.1]
> 13:09:14.940 [db-async-netty-thread-1] DEBUG
> io.netty.channel.nio.NioEventLoop - Selector.select() returned prematurely
> because Thread.currentThread().interrupt() was called. Use
> NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.
> 13:09:14.943 [SparkListenerBus] ERROR org.apache.spark.util.Utils -
> uncaught error in thread SparkListenerBus, stopping SparkContext
> java.lang.InterruptedException: null
> at
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:998)
> ~[na:1.8.0_65]
> at
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
> ~[na:1.8.0_65]
> at java.util.concurrent.Semaphore.acquire(Semaphore.java:312)
> ~[na:1.8.0_65]
> at
> org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(AsynchronousListenerBus.scala:65)
> ~[spark-core_2.10-1.5.1.jar:1.5.1]
> at
> org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1136)
> ~[spark-core_2.10-1.5.1.jar:1.5.1]
> at
> org.apache.spark.util.AsynchronousListenerBus$$anon$1.run(AsynchronousListenerBus.scal

Re: Calling SparkContext methods in scala Future

2016-01-18 Thread Shixiong(Ryan) Zhu
Hey Marco,

Since the codes in Future is in an asynchronous way, you cannot call
"sparkContext.stop" at the end of "fetch" because the codes in Future may
not finish.

However, the exception seems weird. Do you have a simple reproducer?


On Mon, Jan 18, 2016 at 9:13 AM, Ted Yu  wrote:

>   externalCallTwo map { dataTwo =>
> println("in map") // prints, so it gets here ...
> val rddOne = sparkContext.parallelize(dataOne)
>
> I don't think you should call method on sparkContext in map function.
> sparkContext lives on driver side.
>
> Cheers
>
> On Mon, Jan 18, 2016 at 6:27 AM, Marco  wrote:
>
>> Hello,
>>
>> I am using Spark 1.5.1 within SBT, and Scala 2.10.6 and I am facing an
>> issue with the SparkContext.
>>
>> Basically, I have an object that needs to do several things:
>>
>> - call an external service One (web api)
>> - call an external service Two (another api)
>> - read and produce an RDD from HDFS (Spark)
>> - parallelize the data obtained in the first two calls
>> - join these different rdds, do stuff with them...
>>
>> Now, I am trying to do it in an asynchronous way. This doesn't seem to
>> work, though. My guess is that Spark doesn't see the calls to .parallelize,
>> as they are made in different tasks (or Future, therefore this code is
>> called before/later or maybe with an unset Context (can it be?)). I have
>> tried different ways, one of these being the call to SparkEnv.set in the
>> calls to flatMap and map (in the Future). However, all I get is Cannot call
>> methods on a stopped SparkContext. It just doesnt'work - maybe I just
>> misunderstood what it does, therefore I removed it.
>>
>> This is the code I have written so far:
>>
>> object Fetcher {
>>
>>   def fetch(name, master, ...) = {
>> val externalCallOne: Future[WSResponse] = externalService1()
>> val externalCallTwo: Future[String] = externalService2()
>> // val sparkEnv = SparkEnv.get
>> val config = new SparkConf()
>> .setAppName(name)
>> .set("spark.master", master)
>> .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
>>
>> val sparkContext = new SparkContext(config)
>> //val sparkEnv = SparkEnv.get
>>
>> val eventuallyJoinedData = externalCallOne flatMap { dataOne =>
>>   // SparkEnv.set(sparkEnv)
>>   externalCallTwo map { dataTwo =>
>> println("in map") // prints, so it gets here ...
>> val rddOne = sparkContext.parallelize(dataOne)
>> val rddTwo = sparkContext.parallelize(dataTwo)
>> // do stuff here ... foreach/println, and
>>
>> val joinedData = rddOne leftOuterJoin (rddTwo)
>>   }
>> }
>> eventuallyJoinedData onSuccess { case success => ...  }
>> eventuallyJoinedData onFailure { case error =>
>> println(error.getMessage) }
>> // sparkContext.stop
>>   }
>>
>> }
>> As you can see, I have also tried to comment the line to stop the
>> context, but then I get another issue:
>>
>> 13:09:14.929 [ForkJoinPool-1-worker-5] INFO
>>  org.apache.spark.SparkContext - Starting job: count at Fetcher.scala:38
>> 13:09:14.932 [shuffle-server-0] DEBUG io.netty.channel.nio.NioEventLoop -
>> Selector.select() returned prematurely because
>> Thread.currentThread().interrupt() was called. Use
>> NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.
>> 13:09:14.936 [Spark Context Cleaner] ERROR
>> org.apache.spark.ContextCleaner - Error in cleaning thread
>> java.lang.InterruptedException: null
>> at java.lang.Object.wait(Native Method) ~[na:1.8.0_65]
>> at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
>> ~[na:1.8.0_65]
>> at
>> org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:157)
>> ~[spark-core_2.10-1.5.1.jar:1.5.1]
>> at
>> org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1136)
>> [spark-core_2.10-1.5.1.jar:1.5.1]
>> at 
>> org.apache.spark.ContextCleaner.org$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:154)
>> [spark-core_2.10-1.5.1.jar:1.5.1]
>> at
>> org.apache.spark.ContextCleaner$$anon$3.run(ContextCleaner.scala:67)
>> [spark-core_2.10-1.5.1.jar:1.5.1]
>> 13:09:14.940 [db-async-netty-thread-1] DEBUG
>> io.netty.channel.nio.NioEventLoop - Selector.select() returned prematurely
>> because Thread.currentThread().interrupt() was called. Use
>> NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.
>> 13:09:14.943 [SparkListenerBus] ERROR org.apache.spark.util.Utils -
>> uncaught error in thread SparkListenerBus, stopping SparkContext
>> java.lang.InterruptedException: null
>> at
>> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:998)
>> ~[na:1.8.0_65]
>> at
>> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>> ~[na:1.8.0_65]
>> at java.util.concurrent.Semaphore.acquire(Semaphore.ja

Re: Calling SparkContext methods in scala Future

2016-01-19 Thread Marco
Thank you guys for the answers.

@Ted Yu: You are right, in general the code to fetch stuff externally
should be called separately, while Spark should only access the data
written by these two services via flume/kafka/whatever. However, before I
get there, I would like to have the Spark job ready.

@Shixiong Zhu: I imagined something like that, and I must say that I
thought since the beginning that SparkContext could not be called in
Futures in general. It seems that I was right with that assumption,
although I tried and I got the confirmation I needed. Unfortunately, I
don't have a reproducer, but I would say that it's enough to create one
Future and call sparkContext from there.

Thanks again for the answers.

Kind regards,
Marco

2016-01-18 19:37 GMT+01:00 Shixiong(Ryan) Zhu :

> Hey Marco,
>
> Since the codes in Future is in an asynchronous way, you cannot call
> "sparkContext.stop" at the end of "fetch" because the codes in Future may
> not finish.
>
> However, the exception seems weird. Do you have a simple reproducer?
>
>
> On Mon, Jan 18, 2016 at 9:13 AM, Ted Yu  wrote:
>
>>   externalCallTwo map { dataTwo =>
>> println("in map") // prints, so it gets here ...
>> val rddOne = sparkContext.parallelize(dataOne)
>>
>> I don't think you should call method on sparkContext in map function.
>> sparkContext lives on driver side.
>>
>> Cheers
>>
>> On Mon, Jan 18, 2016 at 6:27 AM, Marco  wrote:
>>
>>> Hello,
>>>
>>> I am using Spark 1.5.1 within SBT, and Scala 2.10.6 and I am facing an
>>> issue with the SparkContext.
>>>
>>> Basically, I have an object that needs to do several things:
>>>
>>> - call an external service One (web api)
>>> - call an external service Two (another api)
>>> - read and produce an RDD from HDFS (Spark)
>>> - parallelize the data obtained in the first two calls
>>> - join these different rdds, do stuff with them...
>>>
>>> Now, I am trying to do it in an asynchronous way. This doesn't seem to
>>> work, though. My guess is that Spark doesn't see the calls to .parallelize,
>>> as they are made in different tasks (or Future, therefore this code is
>>> called before/later or maybe with an unset Context (can it be?)). I have
>>> tried different ways, one of these being the call to SparkEnv.set in the
>>> calls to flatMap and map (in the Future). However, all I get is Cannot call
>>> methods on a stopped SparkContext. It just doesnt'work - maybe I just
>>> misunderstood what it does, therefore I removed it.
>>>
>>> This is the code I have written so far:
>>>
>>> object Fetcher {
>>>
>>>   def fetch(name, master, ...) = {
>>> val externalCallOne: Future[WSResponse] = externalService1()
>>> val externalCallTwo: Future[String] = externalService2()
>>> // val sparkEnv = SparkEnv.get
>>> val config = new SparkConf()
>>> .setAppName(name)
>>> .set("spark.master", master)
>>> .set("spark.serializer",
>>> "org.apache.spark.serializer.KryoSerializer")
>>>
>>> val sparkContext = new SparkContext(config)
>>> //val sparkEnv = SparkEnv.get
>>>
>>> val eventuallyJoinedData = externalCallOne flatMap { dataOne =>
>>>   // SparkEnv.set(sparkEnv)
>>>   externalCallTwo map { dataTwo =>
>>> println("in map") // prints, so it gets here ...
>>> val rddOne = sparkContext.parallelize(dataOne)
>>> val rddTwo = sparkContext.parallelize(dataTwo)
>>> // do stuff here ... foreach/println, and
>>>
>>> val joinedData = rddOne leftOuterJoin (rddTwo)
>>>   }
>>> }
>>> eventuallyJoinedData onSuccess { case success => ...  }
>>> eventuallyJoinedData onFailure { case error =>
>>> println(error.getMessage) }
>>> // sparkContext.stop
>>>   }
>>>
>>> }
>>> As you can see, I have also tried to comment the line to stop the
>>> context, but then I get another issue:
>>>
>>> 13:09:14.929 [ForkJoinPool-1-worker-5] INFO
>>>  org.apache.spark.SparkContext - Starting job: count at Fetcher.scala:38
>>> 13:09:14.932 [shuffle-server-0] DEBUG io.netty.channel.nio.NioEventLoop
>>> - Selector.select() returned prematurely because
>>> Thread.currentThread().interrupt() was called. Use
>>> NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.
>>> 13:09:14.936 [Spark Context Cleaner] ERROR
>>> org.apache.spark.ContextCleaner - Error in cleaning thread
>>> java.lang.InterruptedException: null
>>> at java.lang.Object.wait(Native Method) ~[na:1.8.0_65]
>>> at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
>>> ~[na:1.8.0_65]
>>> at
>>> org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:157)
>>> ~[spark-core_2.10-1.5.1.jar:1.5.1]
>>> at
>>> org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1136)
>>> [spark-core_2.10-1.5.1.jar:1.5.1]
>>> at 
>>> org.apache.spark.ContextCleaner.org$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:154)
>>> [spark-core_2.10-1.5.1.jar:1.5.1]
>>> at