Re: Can't get SparkListener to work
Thanks Shixiong. I'll try this. On Sun, Apr 19, 2015, 7:36 PM Shixiong Zhu zsxw...@gmail.com wrote: The problem is the code you use to test: sc.parallelize(List(1, 2, 3)).map(throw new SparkException(test)).collect(); is like the following example: def foo: Int = Nothing = { throw new SparkException(test) } sc.parallelize(List(1, 2, 3)).map(foo).collect(); So actually the Spark jobs do not be submitted since it fails in `foo` that is used to create the map function. Change it to sc.parallelize(List(1, 2, 3)).map(i = throw new SparkException(test)).collect(); And you will see the correct messages from your listener. Best Regards, Shixiong(Ryan) Zhu 2015-04-19 1:06 GMT+08:00 Praveen Balaji secondorderpolynom...@gmail.com : Thanks for the response, Archit. I get callbacks when I do not throw an exception from map. My use case, however, is to get callbacks for exceptions in transformations on executors. Do you think I'm going down the right route? Cheers -p On Sat, Apr 18, 2015 at 1:49 AM, Archit Thakur archit279tha...@gmail.com wrote: Hi Praveen, Can you try once removing throw exception in map. Do you still not get it.? On Apr 18, 2015 8:14 AM, Praveen Balaji secondorderpolynom...@gmail.com wrote: Thanks for the response, Imran. I probably chose the wrong methods for this email. I implemented all methods of SparkListener and the only callback I get is onExecutorMetricsUpdate. Here's the complete code: == import org.apache.spark.scheduler._ sc.addSparkListener(new SparkListener() { override def onStageCompleted(e: SparkListenerStageCompleted) = println( onStageCompleted); override def onStageSubmitted(e: SparkListenerStageSubmitted) = println( onStageSubmitted); override def onTaskStart(e: SparkListenerTaskStart) = println( onTaskStart); override def onTaskGettingResult(e: SparkListenerTaskGettingResult) = println( onTaskGettingResult); override def onTaskEnd(e: SparkListenerTaskEnd) = println( onTaskEnd); override def onJobStart(e: SparkListenerJobStart) = println( onJobStart); override def onJobEnd(e: SparkListenerJobEnd) = println( onJobEnd); override def onEnvironmentUpdate(e: SparkListenerEnvironmentUpdate) = println( onEnvironmentUpdate); override def onBlockManagerAdded(e: SparkListenerBlockManagerAdded) = println( onBlockManagerAdded); override def onBlockManagerRemoved(e: SparkListenerBlockManagerRemoved) = println( onBlockManagerRemoved); override def onUnpersistRDD(e: SparkListenerUnpersistRDD) = println( onUnpersistRDD); override def onApplicationStart(e: SparkListenerApplicationStart) = println( onApplicationStart); override def onApplicationEnd(e: SparkListenerApplicationEnd) = println( onApplicationEnd); override def onExecutorMetricsUpdate(e: SparkListenerExecutorMetricsUpdate) = println( onExecutorMetricsUpdate); }); sc.parallelize(List(1, 2, 3)).map(throw new SparkException(test)).collect(); = On Fri, Apr 17, 2015 at 4:13 PM, Imran Rashid iras...@cloudera.com wrote: when you start the spark-shell, its already too late to get the ApplicationStart event. Try listening for StageCompleted or JobEnd instead. On Fri, Apr 17, 2015 at 5:54 PM, Praveen Balaji secondorderpolynom...@gmail.com wrote: I'm trying to create a simple SparkListener to get notified of error on executors. I do not get any call backs on my SparkListener. Here some simple code I'm executing in spark-shell. But I still don't get any callbacks on my listener. Am I doing something wrong? Thanks for any clue you can send my way. Cheers Praveen == import org.apache.spark.scheduler.SparkListener import org.apache.spark.scheduler.SparkListenerApplicationStart import org.apache.spark.scheduler.SparkListenerApplicationEnd import org.apache.spark.SparkException sc.addSparkListener(new SparkListener() { override def onApplicationStart(applicationStart: SparkListenerApplicationStart) { println( onApplicationStart: + applicationStart.appName); } override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) { println( onApplicationEnd: + applicationEnd.time); } }); sc.parallelize(List(1, 2, 3)).map(throw new SparkException(test)).collect(); === output: scala org.apache.spark.SparkException: hshsh at $iwC$$iwC$$iwC$$iwC.init(console:29) at $iwC$$iwC$$iwC.init(console:34) at $iwC$$iwC.init(console:36) at $iwC.init(console:38)
Re: Can't get SparkListener to work
Thanks for the response, Archit. I get callbacks when I do not throw an exception from map. My use case, however, is to get callbacks for exceptions in transformations on executors. Do you think I'm going down the right route? Cheers -p On Sat, Apr 18, 2015 at 1:49 AM, Archit Thakur archit279tha...@gmail.com wrote: Hi Praveen, Can you try once removing throw exception in map. Do you still not get it.? On Apr 18, 2015 8:14 AM, Praveen Balaji secondorderpolynom...@gmail.com wrote: Thanks for the response, Imran. I probably chose the wrong methods for this email. I implemented all methods of SparkListener and the only callback I get is onExecutorMetricsUpdate. Here's the complete code: == import org.apache.spark.scheduler._ sc.addSparkListener(new SparkListener() { override def onStageCompleted(e: SparkListenerStageCompleted) = println( onStageCompleted); override def onStageSubmitted(e: SparkListenerStageSubmitted) = println( onStageSubmitted); override def onTaskStart(e: SparkListenerTaskStart) = println( onTaskStart); override def onTaskGettingResult(e: SparkListenerTaskGettingResult) = println( onTaskGettingResult); override def onTaskEnd(e: SparkListenerTaskEnd) = println( onTaskEnd); override def onJobStart(e: SparkListenerJobStart) = println( onJobStart); override def onJobEnd(e: SparkListenerJobEnd) = println( onJobEnd); override def onEnvironmentUpdate(e: SparkListenerEnvironmentUpdate) = println( onEnvironmentUpdate); override def onBlockManagerAdded(e: SparkListenerBlockManagerAdded) = println( onBlockManagerAdded); override def onBlockManagerRemoved(e: SparkListenerBlockManagerRemoved) = println( onBlockManagerRemoved); override def onUnpersistRDD(e: SparkListenerUnpersistRDD) = println( onUnpersistRDD); override def onApplicationStart(e: SparkListenerApplicationStart) = println( onApplicationStart); override def onApplicationEnd(e: SparkListenerApplicationEnd) = println( onApplicationEnd); override def onExecutorMetricsUpdate(e: SparkListenerExecutorMetricsUpdate) = println( onExecutorMetricsUpdate); }); sc.parallelize(List(1, 2, 3)).map(throw new SparkException(test)).collect(); = On Fri, Apr 17, 2015 at 4:13 PM, Imran Rashid iras...@cloudera.com wrote: when you start the spark-shell, its already too late to get the ApplicationStart event. Try listening for StageCompleted or JobEnd instead. On Fri, Apr 17, 2015 at 5:54 PM, Praveen Balaji secondorderpolynom...@gmail.com wrote: I'm trying to create a simple SparkListener to get notified of error on executors. I do not get any call backs on my SparkListener. Here some simple code I'm executing in spark-shell. But I still don't get any callbacks on my listener. Am I doing something wrong? Thanks for any clue you can send my way. Cheers Praveen == import org.apache.spark.scheduler.SparkListener import org.apache.spark.scheduler.SparkListenerApplicationStart import org.apache.spark.scheduler.SparkListenerApplicationEnd import org.apache.spark.SparkException sc.addSparkListener(new SparkListener() { override def onApplicationStart(applicationStart: SparkListenerApplicationStart) { println( onApplicationStart: + applicationStart.appName); } override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) { println( onApplicationEnd: + applicationEnd.time); } }); sc.parallelize(List(1, 2, 3)).map(throw new SparkException(test)).collect(); === output: scala org.apache.spark.SparkException: hshsh at $iwC$$iwC$$iwC$$iwC.init(console:29) at $iwC$$iwC$$iwC.init(console:34) at $iwC$$iwC.init(console:36) at $iwC.init(console:38)
Can't get SparkListener to work
I'm trying to create a simple SparkListener to get notified of error on executors. I do not get any call backs on my SparkListener. Here some simple code I'm executing in spark-shell. But I still don't get any callbacks on my listener. Am I doing something wrong? Thanks for any clue you can send my way. Cheers Praveen == import org.apache.spark.scheduler.SparkListener import org.apache.spark.scheduler.SparkListenerApplicationStart import org.apache.spark.scheduler.SparkListenerApplicationEnd import org.apache.spark.SparkException sc.addSparkListener(new SparkListener() { override def onApplicationStart(applicationStart: SparkListenerApplicationStart) { println( onApplicationStart: + applicationStart.appName); } override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) { println( onApplicationEnd: + applicationEnd.time); } }); sc.parallelize(List(1, 2, 3)).map(throw new SparkException(test)).collect(); === output: scala org.apache.spark.SparkException: hshsh at $iwC$$iwC$$iwC$$iwC.init(console:29) at $iwC$$iwC$$iwC.init(console:34) at $iwC$$iwC.init(console:36) at $iwC.init(console:38)
Re: Can't get SparkListener to work
Thanks for the response, Imran. I probably chose the wrong methods for this email. I implemented all methods of SparkListener and the only callback I get is onExecutorMetricsUpdate. Here's the complete code: == import org.apache.spark.scheduler._ sc.addSparkListener(new SparkListener() { override def onStageCompleted(e: SparkListenerStageCompleted) = println( onStageCompleted); override def onStageSubmitted(e: SparkListenerStageSubmitted) = println( onStageSubmitted); override def onTaskStart(e: SparkListenerTaskStart) = println( onTaskStart); override def onTaskGettingResult(e: SparkListenerTaskGettingResult) = println( onTaskGettingResult); override def onTaskEnd(e: SparkListenerTaskEnd) = println( onTaskEnd); override def onJobStart(e: SparkListenerJobStart) = println( onJobStart); override def onJobEnd(e: SparkListenerJobEnd) = println( onJobEnd); override def onEnvironmentUpdate(e: SparkListenerEnvironmentUpdate) = println( onEnvironmentUpdate); override def onBlockManagerAdded(e: SparkListenerBlockManagerAdded) = println( onBlockManagerAdded); override def onBlockManagerRemoved(e: SparkListenerBlockManagerRemoved) = println( onBlockManagerRemoved); override def onUnpersistRDD(e: SparkListenerUnpersistRDD) = println( onUnpersistRDD); override def onApplicationStart(e: SparkListenerApplicationStart) = println( onApplicationStart); override def onApplicationEnd(e: SparkListenerApplicationEnd) = println( onApplicationEnd); override def onExecutorMetricsUpdate(e: SparkListenerExecutorMetricsUpdate) = println( onExecutorMetricsUpdate); }); sc.parallelize(List(1, 2, 3)).map(throw new SparkException(test)).collect(); = On Fri, Apr 17, 2015 at 4:13 PM, Imran Rashid iras...@cloudera.com wrote: when you start the spark-shell, its already too late to get the ApplicationStart event. Try listening for StageCompleted or JobEnd instead. On Fri, Apr 17, 2015 at 5:54 PM, Praveen Balaji secondorderpolynom...@gmail.com wrote: I'm trying to create a simple SparkListener to get notified of error on executors. I do not get any call backs on my SparkListener. Here some simple code I'm executing in spark-shell. But I still don't get any callbacks on my listener. Am I doing something wrong? Thanks for any clue you can send my way. Cheers Praveen == import org.apache.spark.scheduler.SparkListener import org.apache.spark.scheduler.SparkListenerApplicationStart import org.apache.spark.scheduler.SparkListenerApplicationEnd import org.apache.spark.SparkException sc.addSparkListener(new SparkListener() { override def onApplicationStart(applicationStart: SparkListenerApplicationStart) { println( onApplicationStart: + applicationStart.appName); } override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) { println( onApplicationEnd: + applicationEnd.time); } }); sc.parallelize(List(1, 2, 3)).map(throw new SparkException(test)).collect(); === output: scala org.apache.spark.SparkException: hshsh at $iwC$$iwC$$iwC$$iwC.init(console:29) at $iwC$$iwC$$iwC.init(console:34) at $iwC$$iwC.init(console:36) at $iwC.init(console:38)
Database operations on executor nodes
I was wondering what people generally do about doing database operations from executor nodes. I’m (at least for now) avoiding doing database updates from executor nodes to avoid proliferation of database connections on the cluster. The general pattern I adopt is to collect queries (or tuples) on the executors and write to the database on the driver. // Executes on the executor rdd.foreach(s = { val query = sinsert into ${s}; accumulator += query; }); // Executes on the driver acclumulator.value.foreach(query = { // get connection // update database }); I’m obviously trading database connections for driver heap. How do other spark users do it? Cheers Praveen - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org