Thanks Shixiong. I'll try this.

On Sun, Apr 19, 2015, 7:36 PM Shixiong Zhu <zsxw...@gmail.com> wrote:

> The problem is the code you use to test:
>
>
> sc.parallelize(List(1, 2, 3)).map(throw new
> SparkException("test")).collect();
>
> is like the following example:
>
> def foo: Int => Nothing = {
>   throw new SparkException("test")
> }
> sc.parallelize(List(1, 2, 3)).map(foo).collect();
>
> So actually the Spark jobs do not be submitted since it fails in `foo`
> that is used to create the map function.
>
> Change it to
>
> sc.parallelize(List(1, 2, 3)).map(i => throw new
> SparkException("test")).collect();
>
> And you will see the correct messages from your listener.
>
>
>
> Best Regards,
> Shixiong(Ryan) Zhu
>
> 2015-04-19 1:06 GMT+08:00 Praveen Balaji <secondorderpolynom...@gmail.com>
> :
>
>> Thanks for the response, Archit. I get callbacks when I do not throw an
>> exception from map.
>> My use case, however, is to get callbacks for exceptions in
>> transformations on executors. Do you think I'm going down the right route?
>>
>> Cheers
>> -p
>>
>> On Sat, Apr 18, 2015 at 1:49 AM, Archit Thakur <archit279tha...@gmail.com
>> > wrote:
>>
>>> Hi Praveen,
>>> Can you try once removing throw exception in map. Do you still not get
>>> it.?
>>> On Apr 18, 2015 8:14 AM, "Praveen Balaji" <
>>> secondorderpolynom...@gmail.com> wrote:
>>>
>>>> Thanks for the response, Imran. I probably chose the wrong methods for
>>>> this email. I implemented all methods of SparkListener and the only
>>>> callback I get is onExecutorMetricsUpdate.
>>>>
>>>> Here's the complete code:
>>>>
>>>> ======
>>>>
>>>> import org.apache.spark.scheduler._
>>>>
>>>>     sc.addSparkListener(new SparkListener() {
>>>>       override def onStageCompleted(e: SparkListenerStageCompleted) =
>>>> println(">>>> onStageCompleted");
>>>>       override def onStageSubmitted(e: SparkListenerStageSubmitted) =
>>>> println(">>>> onStageSubmitted");
>>>>       override def onTaskStart(e: SparkListenerTaskStart) =
>>>> println(">>>> onTaskStart");
>>>>       override def onTaskGettingResult(e:
>>>> SparkListenerTaskGettingResult) = println(">>>> onTaskGettingResult");
>>>>       override def onTaskEnd(e: SparkListenerTaskEnd) = println(">>>>
>>>> onTaskEnd");
>>>>       override def onJobStart(e: SparkListenerJobStart) = println(">>>
>>>> onJobStart");
>>>>       override def onJobEnd(e: SparkListenerJobEnd) = println(">>>>
>>>> onJobEnd");
>>>>       override def onEnvironmentUpdate(e:
>>>> SparkListenerEnvironmentUpdate) = println(">>>> onEnvironmentUpdate");
>>>>       override def onBlockManagerAdded(e:
>>>> SparkListenerBlockManagerAdded) = println(">>>> onBlockManagerAdded");
>>>>       override def onBlockManagerRemoved(e:
>>>> SparkListenerBlockManagerRemoved) = println(">>>> onBlockManagerRemoved");
>>>>       override def onUnpersistRDD(e: SparkListenerUnpersistRDD) =
>>>> println(">>>> onUnpersistRDD");
>>>>       override def onApplicationStart(e: SparkListenerApplicationStart)
>>>> = println(">>>> onApplicationStart");
>>>>       override def onApplicationEnd(e: SparkListenerApplicationEnd) =
>>>> println(">>>> onApplicationEnd");
>>>>       override def onExecutorMetricsUpdate(e:
>>>> SparkListenerExecutorMetricsUpdate) = println(">>>>
>>>> onExecutorMetricsUpdate");
>>>>     });
>>>>
>>>>     sc.parallelize(List(1, 2, 3)).map(throw new
>>>> SparkException("test")).collect();
>>>>
>>>> =====
>>>>
>>>> On Fri, Apr 17, 2015 at 4:13 PM, Imran Rashid <iras...@cloudera.com>
>>>> wrote:
>>>>
>>>>> when you start the spark-shell, its already too late to get the
>>>>> ApplicationStart event.  Try listening for StageCompleted or JobEnd 
>>>>> instead.
>>>>>
>>>>> On Fri, Apr 17, 2015 at 5:54 PM, Praveen Balaji <
>>>>> secondorderpolynom...@gmail.com> wrote:
>>>>>
>>>>>> I'm trying to create a simple SparkListener to get notified of error
>>>>>> on executors. I do not get any call backs on my SparkListener. Here some
>>>>>> simple code I'm executing in spark-shell. But I still don't get any
>>>>>> callbacks on my listener. Am I doing something wrong?
>>>>>>
>>>>>> Thanks for any clue you can send my way.
>>>>>>
>>>>>> Cheers
>>>>>> Praveen
>>>>>>
>>>>>> ======
>>>>>> import org.apache.spark.scheduler.SparkListener
>>>>>> import org.apache.spark.scheduler.SparkListenerApplicationStart
>>>>>> import org.apache.spark.scheduler.SparkListenerApplicationEnd
>>>>>> import org.apache.spark.SparkException
>>>>>>
>>>>>>     sc.addSparkListener(new SparkListener() {
>>>>>>       override def onApplicationStart(applicationStart:
>>>>>> SparkListenerApplicationStart) {
>>>>>>         println(">>>> onApplicationStart: " +
>>>>>> applicationStart.appName);
>>>>>>       }
>>>>>>
>>>>>>       override def onApplicationEnd(applicationEnd:
>>>>>> SparkListenerApplicationEnd) {
>>>>>>         println(">>>> onApplicationEnd: " + applicationEnd.time);
>>>>>>       }
>>>>>>     });
>>>>>>
>>>>>>     sc.parallelize(List(1, 2, 3)).map(throw new
>>>>>> SparkException("test")).collect();
>>>>>> =======
>>>>>>
>>>>>> output:
>>>>>>
>>>>>> scala> org.apache.spark.SparkException: hshsh
>>>>>>         at $iwC$$iwC$$iwC$$iwC.<init>(<console>:29)
>>>>>>         at $iwC$$iwC$$iwC.<init>(<console>:34)
>>>>>>         at $iwC$$iwC.<init>(<console>:36)
>>>>>>         at $iwC.<init>(<console>:38)
>>>>>>
>>>>>>
>>>>>
>>>>
>>
>

Reply via email to