You need to call sc.stop() to wait for the notifications to be processed. Best Regards, Shixiong(Ryan) Zhu
2015-04-21 4:18 GMT+08:00 Praveen Balaji <secondorderpolynom...@gmail.com>: > Thanks Shixiong. I tried it out and it works. > > If you're looking at this post, here a few points you may be interested in: > > Turns out this is has to do with the difference between methods and > function in scala - something I didn't pay attention to before. If you're > looking at this thread, this may be an interesting post: > > http://jim-mcbeath.blogspot.com/2009/05/scala-functions-vs-methods.html > > Below is some test code. I added the Thread.sleep because it looks like > Spark notifications happen asynchronously and the main/driver thread wont > wait for the notifications to be complete. I'll look at that further later, > but for now that's my inference, so don't take my word for it yet. Here's > the code: > > object TestME { > def main(args: Array[String]): Unit = { > val conf = new SparkConf().setAppName("testme"); > val sc = new SparkContext(conf); > try { > foo(sc); > } finally { > Thread.sleep(2000); > } > } > > def foo(sc: SparkContext) = { > sc.addSparkListener(new SparkListener() { > override def onTaskStart(e: SparkListenerTaskStart) = println(">>>> > onTaskStart"); > override def onTaskEnd(e: SparkListenerTaskEnd) = println(">>>> > onTaskEnd"); > }); > > sc.parallelize(List(1, 2, 3)).map(i => throw new > SparkException("test")).collect(); > } > } > > I'm running it from Eclipse on local[*]. > > > > On Sun, Apr 19, 2015 at 7:57 PM, Praveen Balaji < > secondorderpolynom...@gmail.com> wrote: > >> Thanks Shixiong. I'll try this. >> >> On Sun, Apr 19, 2015, 7:36 PM Shixiong Zhu <zsxw...@gmail.com> wrote: >> >>> The problem is the code you use to test: >>> >>> >>> sc.parallelize(List(1, 2, 3)).map(throw new >>> SparkException("test")).collect(); >>> >>> is like the following example: >>> >>> def foo: Int => Nothing = { >>> throw new SparkException("test") >>> } >>> sc.parallelize(List(1, 2, 3)).map(foo).collect(); >>> >>> So actually the Spark jobs do not be submitted since it fails in `foo` >>> that is used to create the map function. >>> >>> Change it to >>> >>> sc.parallelize(List(1, 2, 3)).map(i => throw new >>> SparkException("test")).collect(); >>> >>> And you will see the correct messages from your listener. >>> >>> >>> >>> Best Regards, >>> Shixiong(Ryan) Zhu >>> >>> 2015-04-19 1:06 GMT+08:00 Praveen Balaji < >>> secondorderpolynom...@gmail.com>: >>> >>>> Thanks for the response, Archit. I get callbacks when I do not throw an >>>> exception from map. >>>> My use case, however, is to get callbacks for exceptions in >>>> transformations on executors. Do you think I'm going down the right route? >>>> >>>> Cheers >>>> -p >>>> >>>> On Sat, Apr 18, 2015 at 1:49 AM, Archit Thakur < >>>> archit279tha...@gmail.com> wrote: >>>> >>>>> Hi Praveen, >>>>> Can you try once removing throw exception in map. Do you still not get >>>>> it.? >>>>> On Apr 18, 2015 8:14 AM, "Praveen Balaji" < >>>>> secondorderpolynom...@gmail.com> wrote: >>>>> >>>>>> Thanks for the response, Imran. I probably chose the wrong methods >>>>>> for this email. I implemented all methods of SparkListener and the only >>>>>> callback I get is onExecutorMetricsUpdate. >>>>>> >>>>>> Here's the complete code: >>>>>> >>>>>> ====== >>>>>> >>>>>> import org.apache.spark.scheduler._ >>>>>> >>>>>> sc.addSparkListener(new SparkListener() { >>>>>> override def onStageCompleted(e: SparkListenerStageCompleted) = >>>>>> println(">>>> onStageCompleted"); >>>>>> override def onStageSubmitted(e: SparkListenerStageSubmitted) = >>>>>> println(">>>> onStageSubmitted"); >>>>>> override def onTaskStart(e: SparkListenerTaskStart) = >>>>>> println(">>>> onTaskStart"); >>>>>> override def onTaskGettingResult(e: >>>>>> SparkListenerTaskGettingResult) = println(">>>> onTaskGettingResult"); >>>>>> override def onTaskEnd(e: SparkListenerTaskEnd) = println(">>>> >>>>>> onTaskEnd"); >>>>>> override def onJobStart(e: SparkListenerJobStart) = >>>>>> println(">>> onJobStart"); >>>>>> override def onJobEnd(e: SparkListenerJobEnd) = println(">>>> >>>>>> onJobEnd"); >>>>>> override def onEnvironmentUpdate(e: >>>>>> SparkListenerEnvironmentUpdate) = println(">>>> onEnvironmentUpdate"); >>>>>> override def onBlockManagerAdded(e: >>>>>> SparkListenerBlockManagerAdded) = println(">>>> onBlockManagerAdded"); >>>>>> override def onBlockManagerRemoved(e: >>>>>> SparkListenerBlockManagerRemoved) = println(">>>> >>>>>> onBlockManagerRemoved"); >>>>>> override def onUnpersistRDD(e: SparkListenerUnpersistRDD) = >>>>>> println(">>>> onUnpersistRDD"); >>>>>> override def onApplicationStart(e: >>>>>> SparkListenerApplicationStart) = println(">>>> onApplicationStart"); >>>>>> override def onApplicationEnd(e: SparkListenerApplicationEnd) = >>>>>> println(">>>> onApplicationEnd"); >>>>>> override def onExecutorMetricsUpdate(e: >>>>>> SparkListenerExecutorMetricsUpdate) = println(">>>> >>>>>> onExecutorMetricsUpdate"); >>>>>> }); >>>>>> >>>>>> sc.parallelize(List(1, 2, 3)).map(throw new >>>>>> SparkException("test")).collect(); >>>>>> >>>>>> ===== >>>>>> >>>>>> On Fri, Apr 17, 2015 at 4:13 PM, Imran Rashid <iras...@cloudera.com> >>>>>> wrote: >>>>>> >>>>>>> when you start the spark-shell, its already too late to get the >>>>>>> ApplicationStart event. Try listening for StageCompleted or JobEnd >>>>>>> instead. >>>>>>> >>>>>>> On Fri, Apr 17, 2015 at 5:54 PM, Praveen Balaji < >>>>>>> secondorderpolynom...@gmail.com> wrote: >>>>>>> >>>>>>>> I'm trying to create a simple SparkListener to get notified of >>>>>>>> error on executors. I do not get any call backs on my SparkListener. >>>>>>>> Here >>>>>>>> some simple code I'm executing in spark-shell. But I still don't get >>>>>>>> any >>>>>>>> callbacks on my listener. Am I doing something wrong? >>>>>>>> >>>>>>>> Thanks for any clue you can send my way. >>>>>>>> >>>>>>>> Cheers >>>>>>>> Praveen >>>>>>>> >>>>>>>> ====== >>>>>>>> import org.apache.spark.scheduler.SparkListener >>>>>>>> import org.apache.spark.scheduler.SparkListenerApplicationStart >>>>>>>> import org.apache.spark.scheduler.SparkListenerApplicationEnd >>>>>>>> import org.apache.spark.SparkException >>>>>>>> >>>>>>>> sc.addSparkListener(new SparkListener() { >>>>>>>> override def onApplicationStart(applicationStart: >>>>>>>> SparkListenerApplicationStart) { >>>>>>>> println(">>>> onApplicationStart: " + >>>>>>>> applicationStart.appName); >>>>>>>> } >>>>>>>> >>>>>>>> override def onApplicationEnd(applicationEnd: >>>>>>>> SparkListenerApplicationEnd) { >>>>>>>> println(">>>> onApplicationEnd: " + applicationEnd.time); >>>>>>>> } >>>>>>>> }); >>>>>>>> >>>>>>>> sc.parallelize(List(1, 2, 3)).map(throw new >>>>>>>> SparkException("test")).collect(); >>>>>>>> ======= >>>>>>>> >>>>>>>> output: >>>>>>>> >>>>>>>> scala> org.apache.spark.SparkException: hshsh >>>>>>>> at $iwC$$iwC$$iwC$$iwC.<init>(<console>:29) >>>>>>>> at $iwC$$iwC$$iwC.<init>(<console>:34) >>>>>>>> at $iwC$$iwC.<init>(<console>:36) >>>>>>>> at $iwC.<init>(<console>:38) >>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>> >>> >