Thanks Shixiong. I'll try this. On Sun, Apr 19, 2015, 7:36 PM Shixiong Zhu <zsxw...@gmail.com> wrote:
> The problem is the code you use to test: > > > sc.parallelize(List(1, 2, 3)).map(throw new > SparkException("test")).collect(); > > is like the following example: > > def foo: Int => Nothing = { > throw new SparkException("test") > } > sc.parallelize(List(1, 2, 3)).map(foo).collect(); > > So actually the Spark jobs do not be submitted since it fails in `foo` > that is used to create the map function. > > Change it to > > sc.parallelize(List(1, 2, 3)).map(i => throw new > SparkException("test")).collect(); > > And you will see the correct messages from your listener. > > > > Best Regards, > Shixiong(Ryan) Zhu > > 2015-04-19 1:06 GMT+08:00 Praveen Balaji <secondorderpolynom...@gmail.com> > : > >> Thanks for the response, Archit. I get callbacks when I do not throw an >> exception from map. >> My use case, however, is to get callbacks for exceptions in >> transformations on executors. Do you think I'm going down the right route? >> >> Cheers >> -p >> >> On Sat, Apr 18, 2015 at 1:49 AM, Archit Thakur <archit279tha...@gmail.com >> > wrote: >> >>> Hi Praveen, >>> Can you try once removing throw exception in map. Do you still not get >>> it.? >>> On Apr 18, 2015 8:14 AM, "Praveen Balaji" < >>> secondorderpolynom...@gmail.com> wrote: >>> >>>> Thanks for the response, Imran. I probably chose the wrong methods for >>>> this email. I implemented all methods of SparkListener and the only >>>> callback I get is onExecutorMetricsUpdate. >>>> >>>> Here's the complete code: >>>> >>>> ====== >>>> >>>> import org.apache.spark.scheduler._ >>>> >>>> sc.addSparkListener(new SparkListener() { >>>> override def onStageCompleted(e: SparkListenerStageCompleted) = >>>> println(">>>> onStageCompleted"); >>>> override def onStageSubmitted(e: SparkListenerStageSubmitted) = >>>> println(">>>> onStageSubmitted"); >>>> override def onTaskStart(e: SparkListenerTaskStart) = >>>> println(">>>> onTaskStart"); >>>> override def onTaskGettingResult(e: >>>> SparkListenerTaskGettingResult) = println(">>>> onTaskGettingResult"); >>>> override def onTaskEnd(e: SparkListenerTaskEnd) = println(">>>> >>>> onTaskEnd"); >>>> override def onJobStart(e: SparkListenerJobStart) = println(">>> >>>> onJobStart"); >>>> override def onJobEnd(e: SparkListenerJobEnd) = println(">>>> >>>> onJobEnd"); >>>> override def onEnvironmentUpdate(e: >>>> SparkListenerEnvironmentUpdate) = println(">>>> onEnvironmentUpdate"); >>>> override def onBlockManagerAdded(e: >>>> SparkListenerBlockManagerAdded) = println(">>>> onBlockManagerAdded"); >>>> override def onBlockManagerRemoved(e: >>>> SparkListenerBlockManagerRemoved) = println(">>>> onBlockManagerRemoved"); >>>> override def onUnpersistRDD(e: SparkListenerUnpersistRDD) = >>>> println(">>>> onUnpersistRDD"); >>>> override def onApplicationStart(e: SparkListenerApplicationStart) >>>> = println(">>>> onApplicationStart"); >>>> override def onApplicationEnd(e: SparkListenerApplicationEnd) = >>>> println(">>>> onApplicationEnd"); >>>> override def onExecutorMetricsUpdate(e: >>>> SparkListenerExecutorMetricsUpdate) = println(">>>> >>>> onExecutorMetricsUpdate"); >>>> }); >>>> >>>> sc.parallelize(List(1, 2, 3)).map(throw new >>>> SparkException("test")).collect(); >>>> >>>> ===== >>>> >>>> On Fri, Apr 17, 2015 at 4:13 PM, Imran Rashid <iras...@cloudera.com> >>>> wrote: >>>> >>>>> when you start the spark-shell, its already too late to get the >>>>> ApplicationStart event. Try listening for StageCompleted or JobEnd >>>>> instead. >>>>> >>>>> On Fri, Apr 17, 2015 at 5:54 PM, Praveen Balaji < >>>>> secondorderpolynom...@gmail.com> wrote: >>>>> >>>>>> I'm trying to create a simple SparkListener to get notified of error >>>>>> on executors. I do not get any call backs on my SparkListener. Here some >>>>>> simple code I'm executing in spark-shell. But I still don't get any >>>>>> callbacks on my listener. Am I doing something wrong? >>>>>> >>>>>> Thanks for any clue you can send my way. >>>>>> >>>>>> Cheers >>>>>> Praveen >>>>>> >>>>>> ====== >>>>>> import org.apache.spark.scheduler.SparkListener >>>>>> import org.apache.spark.scheduler.SparkListenerApplicationStart >>>>>> import org.apache.spark.scheduler.SparkListenerApplicationEnd >>>>>> import org.apache.spark.SparkException >>>>>> >>>>>> sc.addSparkListener(new SparkListener() { >>>>>> override def onApplicationStart(applicationStart: >>>>>> SparkListenerApplicationStart) { >>>>>> println(">>>> onApplicationStart: " + >>>>>> applicationStart.appName); >>>>>> } >>>>>> >>>>>> override def onApplicationEnd(applicationEnd: >>>>>> SparkListenerApplicationEnd) { >>>>>> println(">>>> onApplicationEnd: " + applicationEnd.time); >>>>>> } >>>>>> }); >>>>>> >>>>>> sc.parallelize(List(1, 2, 3)).map(throw new >>>>>> SparkException("test")).collect(); >>>>>> ======= >>>>>> >>>>>> output: >>>>>> >>>>>> scala> org.apache.spark.SparkException: hshsh >>>>>> at $iwC$$iwC$$iwC$$iwC.<init>(<console>:29) >>>>>> at $iwC$$iwC$$iwC.<init>(<console>:34) >>>>>> at $iwC$$iwC.<init>(<console>:36) >>>>>> at $iwC.<init>(<console>:38) >>>>>> >>>>>> >>>>> >>>> >> >