The problem is the code you use to test:

sc.parallelize(List(1, 2, 3)).map(throw new
SparkException("test")).collect();

is like the following example:

def foo: Int => Nothing = {
  throw new SparkException("test")
}
sc.parallelize(List(1, 2, 3)).map(foo).collect();

So actually the Spark jobs do not be submitted since it fails in `foo` that
is used to create the map function.

Change it to

sc.parallelize(List(1, 2, 3)).map(i => throw new
SparkException("test")).collect();

And you will see the correct messages from your listener.



Best Regards,
Shixiong(Ryan) Zhu

2015-04-19 1:06 GMT+08:00 Praveen Balaji <secondorderpolynom...@gmail.com>:

> Thanks for the response, Archit. I get callbacks when I do not throw an
> exception from map.
> My use case, however, is to get callbacks for exceptions in
> transformations on executors. Do you think I'm going down the right route?
>
> Cheers
> -p
>
> On Sat, Apr 18, 2015 at 1:49 AM, Archit Thakur <archit279tha...@gmail.com>
> wrote:
>
>> Hi Praveen,
>> Can you try once removing throw exception in map. Do you still not get
>> it.?
>> On Apr 18, 2015 8:14 AM, "Praveen Balaji" <
>> secondorderpolynom...@gmail.com> wrote:
>>
>>> Thanks for the response, Imran. I probably chose the wrong methods for
>>> this email. I implemented all methods of SparkListener and the only
>>> callback I get is onExecutorMetricsUpdate.
>>>
>>> Here's the complete code:
>>>
>>> ======
>>>
>>> import org.apache.spark.scheduler._
>>>
>>>     sc.addSparkListener(new SparkListener() {
>>>       override def onStageCompleted(e: SparkListenerStageCompleted) =
>>> println(">>>> onStageCompleted");
>>>       override def onStageSubmitted(e: SparkListenerStageSubmitted) =
>>> println(">>>> onStageSubmitted");
>>>       override def onTaskStart(e: SparkListenerTaskStart) =
>>> println(">>>> onTaskStart");
>>>       override def onTaskGettingResult(e:
>>> SparkListenerTaskGettingResult) = println(">>>> onTaskGettingResult");
>>>       override def onTaskEnd(e: SparkListenerTaskEnd) = println(">>>>
>>> onTaskEnd");
>>>       override def onJobStart(e: SparkListenerJobStart) = println(">>>
>>> onJobStart");
>>>       override def onJobEnd(e: SparkListenerJobEnd) = println(">>>>
>>> onJobEnd");
>>>       override def onEnvironmentUpdate(e:
>>> SparkListenerEnvironmentUpdate) = println(">>>> onEnvironmentUpdate");
>>>       override def onBlockManagerAdded(e:
>>> SparkListenerBlockManagerAdded) = println(">>>> onBlockManagerAdded");
>>>       override def onBlockManagerRemoved(e:
>>> SparkListenerBlockManagerRemoved) = println(">>>> onBlockManagerRemoved");
>>>       override def onUnpersistRDD(e: SparkListenerUnpersistRDD) =
>>> println(">>>> onUnpersistRDD");
>>>       override def onApplicationStart(e: SparkListenerApplicationStart)
>>> = println(">>>> onApplicationStart");
>>>       override def onApplicationEnd(e: SparkListenerApplicationEnd) =
>>> println(">>>> onApplicationEnd");
>>>       override def onExecutorMetricsUpdate(e:
>>> SparkListenerExecutorMetricsUpdate) = println(">>>>
>>> onExecutorMetricsUpdate");
>>>     });
>>>
>>>     sc.parallelize(List(1, 2, 3)).map(throw new
>>> SparkException("test")).collect();
>>>
>>> =====
>>>
>>> On Fri, Apr 17, 2015 at 4:13 PM, Imran Rashid <iras...@cloudera.com>
>>> wrote:
>>>
>>>> when you start the spark-shell, its already too late to get the
>>>> ApplicationStart event.  Try listening for StageCompleted or JobEnd 
>>>> instead.
>>>>
>>>> On Fri, Apr 17, 2015 at 5:54 PM, Praveen Balaji <
>>>> secondorderpolynom...@gmail.com> wrote:
>>>>
>>>>> I'm trying to create a simple SparkListener to get notified of error
>>>>> on executors. I do not get any call backs on my SparkListener. Here some
>>>>> simple code I'm executing in spark-shell. But I still don't get any
>>>>> callbacks on my listener. Am I doing something wrong?
>>>>>
>>>>> Thanks for any clue you can send my way.
>>>>>
>>>>> Cheers
>>>>> Praveen
>>>>>
>>>>> ======
>>>>> import org.apache.spark.scheduler.SparkListener
>>>>> import org.apache.spark.scheduler.SparkListenerApplicationStart
>>>>> import org.apache.spark.scheduler.SparkListenerApplicationEnd
>>>>> import org.apache.spark.SparkException
>>>>>
>>>>>     sc.addSparkListener(new SparkListener() {
>>>>>       override def onApplicationStart(applicationStart:
>>>>> SparkListenerApplicationStart) {
>>>>>         println(">>>> onApplicationStart: " +
>>>>> applicationStart.appName);
>>>>>       }
>>>>>
>>>>>       override def onApplicationEnd(applicationEnd:
>>>>> SparkListenerApplicationEnd) {
>>>>>         println(">>>> onApplicationEnd: " + applicationEnd.time);
>>>>>       }
>>>>>     });
>>>>>
>>>>>     sc.parallelize(List(1, 2, 3)).map(throw new
>>>>> SparkException("test")).collect();
>>>>> =======
>>>>>
>>>>> output:
>>>>>
>>>>> scala> org.apache.spark.SparkException: hshsh
>>>>>         at $iwC$$iwC$$iwC$$iwC.<init>(<console>:29)
>>>>>         at $iwC$$iwC$$iwC.<init>(<console>:34)
>>>>>         at $iwC$$iwC.<init>(<console>:36)
>>>>>         at $iwC.<init>(<console>:38)
>>>>>
>>>>>
>>>>
>>>
>

Reply via email to