You need to call sc.stop() to wait for the notifications to be processed.

Best Regards,
Shixiong(Ryan) Zhu

2015-04-21 4:18 GMT+08:00 Praveen Balaji <secondorderpolynom...@gmail.com>:

> Thanks Shixiong. I tried it out and it works.
>
> If you're looking at this post, here a few points you may be interested in:
>
> Turns out this is has to do with the difference between methods and
> function in scala - something I didn't pay attention to before. If you're
> looking at this thread, this may be an interesting post:
>
> http://jim-mcbeath.blogspot.com/2009/05/scala-functions-vs-methods.html
>
> Below is some test code. I added the Thread.sleep because it looks like
> Spark notifications happen asynchronously and the main/driver thread wont
> wait for the notifications to be complete. I'll look at that further later,
> but for now that's my inference, so don't take my word for it yet. Here's
> the code:
>
> object TestME {
>   def main(args: Array[String]): Unit = {
>     val conf = new SparkConf().setAppName("testme");
>     val sc = new SparkContext(conf);
>     try {
>       foo(sc);
>     } finally {
>       Thread.sleep(2000);
>     }
>   }
>
>   def foo(sc: SparkContext) = {
>     sc.addSparkListener(new SparkListener() {
>       override def onTaskStart(e: SparkListenerTaskStart) = println(">>>>
> onTaskStart");
>       override def onTaskEnd(e: SparkListenerTaskEnd) = println(">>>>
> onTaskEnd");
>     });
>
>      sc.parallelize(List(1, 2, 3)).map(i => throw new
> SparkException("test")).collect();
>   }
> }
>
> I'm running it from Eclipse on local[*].
>
>
>
> On Sun, Apr 19, 2015 at 7:57 PM, Praveen Balaji <
> secondorderpolynom...@gmail.com> wrote:
>
>> Thanks Shixiong. I'll try this.
>>
>> On Sun, Apr 19, 2015, 7:36 PM Shixiong Zhu <zsxw...@gmail.com> wrote:
>>
>>> The problem is the code you use to test:
>>>
>>>
>>> sc.parallelize(List(1, 2, 3)).map(throw new
>>> SparkException("test")).collect();
>>>
>>> is like the following example:
>>>
>>> def foo: Int => Nothing = {
>>>   throw new SparkException("test")
>>> }
>>> sc.parallelize(List(1, 2, 3)).map(foo).collect();
>>>
>>> So actually the Spark jobs do not be submitted since it fails in `foo`
>>> that is used to create the map function.
>>>
>>> Change it to
>>>
>>> sc.parallelize(List(1, 2, 3)).map(i => throw new
>>> SparkException("test")).collect();
>>>
>>> And you will see the correct messages from your listener.
>>>
>>>
>>>
>>> Best Regards,
>>> Shixiong(Ryan) Zhu
>>>
>>> 2015-04-19 1:06 GMT+08:00 Praveen Balaji <
>>> secondorderpolynom...@gmail.com>:
>>>
>>>> Thanks for the response, Archit. I get callbacks when I do not throw an
>>>> exception from map.
>>>> My use case, however, is to get callbacks for exceptions in
>>>> transformations on executors. Do you think I'm going down the right route?
>>>>
>>>> Cheers
>>>> -p
>>>>
>>>> On Sat, Apr 18, 2015 at 1:49 AM, Archit Thakur <
>>>> archit279tha...@gmail.com> wrote:
>>>>
>>>>> Hi Praveen,
>>>>> Can you try once removing throw exception in map. Do you still not get
>>>>> it.?
>>>>> On Apr 18, 2015 8:14 AM, "Praveen Balaji" <
>>>>> secondorderpolynom...@gmail.com> wrote:
>>>>>
>>>>>> Thanks for the response, Imran. I probably chose the wrong methods
>>>>>> for this email. I implemented all methods of SparkListener and the only
>>>>>> callback I get is onExecutorMetricsUpdate.
>>>>>>
>>>>>> Here's the complete code:
>>>>>>
>>>>>> ======
>>>>>>
>>>>>> import org.apache.spark.scheduler._
>>>>>>
>>>>>>     sc.addSparkListener(new SparkListener() {
>>>>>>       override def onStageCompleted(e: SparkListenerStageCompleted) =
>>>>>> println(">>>> onStageCompleted");
>>>>>>       override def onStageSubmitted(e: SparkListenerStageSubmitted) =
>>>>>> println(">>>> onStageSubmitted");
>>>>>>       override def onTaskStart(e: SparkListenerTaskStart) =
>>>>>> println(">>>> onTaskStart");
>>>>>>       override def onTaskGettingResult(e:
>>>>>> SparkListenerTaskGettingResult) = println(">>>> onTaskGettingResult");
>>>>>>       override def onTaskEnd(e: SparkListenerTaskEnd) = println(">>>>
>>>>>> onTaskEnd");
>>>>>>       override def onJobStart(e: SparkListenerJobStart) =
>>>>>> println(">>> onJobStart");
>>>>>>       override def onJobEnd(e: SparkListenerJobEnd) = println(">>>>
>>>>>> onJobEnd");
>>>>>>       override def onEnvironmentUpdate(e:
>>>>>> SparkListenerEnvironmentUpdate) = println(">>>> onEnvironmentUpdate");
>>>>>>       override def onBlockManagerAdded(e:
>>>>>> SparkListenerBlockManagerAdded) = println(">>>> onBlockManagerAdded");
>>>>>>       override def onBlockManagerRemoved(e:
>>>>>> SparkListenerBlockManagerRemoved) = println(">>>> 
>>>>>> onBlockManagerRemoved");
>>>>>>       override def onUnpersistRDD(e: SparkListenerUnpersistRDD) =
>>>>>> println(">>>> onUnpersistRDD");
>>>>>>       override def onApplicationStart(e:
>>>>>> SparkListenerApplicationStart) = println(">>>> onApplicationStart");
>>>>>>       override def onApplicationEnd(e: SparkListenerApplicationEnd) =
>>>>>> println(">>>> onApplicationEnd");
>>>>>>       override def onExecutorMetricsUpdate(e:
>>>>>> SparkListenerExecutorMetricsUpdate) = println(">>>>
>>>>>> onExecutorMetricsUpdate");
>>>>>>     });
>>>>>>
>>>>>>     sc.parallelize(List(1, 2, 3)).map(throw new
>>>>>> SparkException("test")).collect();
>>>>>>
>>>>>> =====
>>>>>>
>>>>>> On Fri, Apr 17, 2015 at 4:13 PM, Imran Rashid <iras...@cloudera.com>
>>>>>> wrote:
>>>>>>
>>>>>>> when you start the spark-shell, its already too late to get the
>>>>>>> ApplicationStart event.  Try listening for StageCompleted or JobEnd 
>>>>>>> instead.
>>>>>>>
>>>>>>> On Fri, Apr 17, 2015 at 5:54 PM, Praveen Balaji <
>>>>>>> secondorderpolynom...@gmail.com> wrote:
>>>>>>>
>>>>>>>> I'm trying to create a simple SparkListener to get notified of
>>>>>>>> error on executors. I do not get any call backs on my SparkListener. 
>>>>>>>> Here
>>>>>>>> some simple code I'm executing in spark-shell. But I still don't get 
>>>>>>>> any
>>>>>>>> callbacks on my listener. Am I doing something wrong?
>>>>>>>>
>>>>>>>> Thanks for any clue you can send my way.
>>>>>>>>
>>>>>>>> Cheers
>>>>>>>> Praveen
>>>>>>>>
>>>>>>>> ======
>>>>>>>> import org.apache.spark.scheduler.SparkListener
>>>>>>>> import org.apache.spark.scheduler.SparkListenerApplicationStart
>>>>>>>> import org.apache.spark.scheduler.SparkListenerApplicationEnd
>>>>>>>> import org.apache.spark.SparkException
>>>>>>>>
>>>>>>>>     sc.addSparkListener(new SparkListener() {
>>>>>>>>       override def onApplicationStart(applicationStart:
>>>>>>>> SparkListenerApplicationStart) {
>>>>>>>>         println(">>>> onApplicationStart: " +
>>>>>>>> applicationStart.appName);
>>>>>>>>       }
>>>>>>>>
>>>>>>>>       override def onApplicationEnd(applicationEnd:
>>>>>>>> SparkListenerApplicationEnd) {
>>>>>>>>         println(">>>> onApplicationEnd: " + applicationEnd.time);
>>>>>>>>       }
>>>>>>>>     });
>>>>>>>>
>>>>>>>>     sc.parallelize(List(1, 2, 3)).map(throw new
>>>>>>>> SparkException("test")).collect();
>>>>>>>> =======
>>>>>>>>
>>>>>>>> output:
>>>>>>>>
>>>>>>>> scala> org.apache.spark.SparkException: hshsh
>>>>>>>>         at $iwC$$iwC$$iwC$$iwC.<init>(<console>:29)
>>>>>>>>         at $iwC$$iwC$$iwC.<init>(<console>:34)
>>>>>>>>         at $iwC$$iwC.<init>(<console>:36)
>>>>>>>>         at $iwC.<init>(<console>:38)
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>
>>>
>

Reply via email to