1. you need checkpointing mostly for recovering from driver failures, and in some cases also for some stateful operations.
2. Could you try not using the SPARK_CLASSPATH environment variable. TD On Sat, Jun 27, 2015 at 1:00 AM, Guillermo Ortiz <konstt2...@gmail.com> wrote: > I don't have any checkpoint on my code. Really, I don't have to save any > state. It's just a log processing of a PoC. > I have been testing the code in a VM from Cloudera and I never got that > error.. Not it's a real cluster. > > The command to execute Spark > spark-submit --name "PoC Logs" --master yarn-client --class > com.metrics.MetricsSpark --jars $SPARK_CLASSPATH --executor-memory 1g > /usr/metrics/ex/metrics-spark.jar $1 $2 $3 > > val sparkConf = new SparkConf() > val ssc = new StreamingContext(sparkConf, Seconds(5)) > val kafkaParams = Map[String, String]("metadata.broker.list" -> > args(0)) > val topics = args(1).split("\\,") > val directKafkaStream = KafkaUtils.createDirectStream[String, String, > StringDecoder, StringDecoder](ssc, kafkaParams, topics.toSet) > > directKafkaStream.foreachRDD { rdd => > val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges > val documents = rdd.mapPartitionsWithIndex { (i, kafkaEvent) => > ..... > } > > I understand that I just need a checkpoint if I need to recover the task > it something goes wrong, right? > > > 2015-06-27 9:39 GMT+02:00 Tathagata Das <t...@databricks.com>: > >> How are you trying to execute the code again? From checkpoints, or >> otherwise? >> Also cc'ed Hari who may have a better idea of YARN related issues. >> >> On Sat, Jun 27, 2015 at 12:35 AM, Guillermo Ortiz <konstt2...@gmail.com> >> wrote: >> >>> Hi, >>> >>> I'm executing a SparkStreamig code with Kafka. IçThe code was working >>> but today I tried to execute the code again and I got an exception, I dn't >>> know what's it happening. right now , there are no jobs executions on YARN. >>> How could it fix it? >>> >>> Exception in thread "main" org.apache.spark.SparkException: Yarn >>> application has already ended! It might have been killed or unable to >>> launch application master. >>> at >>> org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.waitForApplication(YarnClientSchedulerBackend.scala:113) >>> at >>> org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:59) >>> at >>> org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:141) >>> at org.apache.spark.SparkContext.<init>(SparkContext.scala:379) >>> at >>> org.apache.spark.streaming.StreamingContext$.createNewSparkContext(StreamingContext.scala:642) >>> at >>> org.apache.spark.streaming.StreamingContext.<init>(StreamingContext.scala:75) >>> at >>> com.produban.metrics.MetricsTransfInternationalSpark$.main(MetricsTransfInternationalSpark.scala:66) >>> at >>> com.produban.metrics.MetricsTransfInternationalSpark.main(MetricsTransfInternationalSpark.scala) >>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>> at >>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) >>> at >>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>> at java.lang.reflect.Method.invoke(Method.java:606) >>> at >>> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569) >>> at >>> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166) >>> at >>> org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189) >>> at >>> org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110) >>> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) >>> *15/06/27 09:27:09 ERROR Utils: Uncaught exception in thread delete >>> Spark local dirs* >>> java.lang.NullPointerException >>> at org.apache.spark.storage.DiskBlockManager.org >>> $apache$spark$storage$DiskBlockManager$$doStop(DiskBlockManager.scala:161) >>> at >>> org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$1.apply$mcV$sp(DiskBlockManager.scala:141) >>> at >>> org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$1.apply(DiskBlockManager.scala:139) >>> at >>> org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$1.apply(DiskBlockManager.scala:139) >>> at >>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1617) >>> at >>> org.apache.spark.storage.DiskBlockManager$$anon$1.run(DiskBlockManager.scala:139) >>> Exception in thread "delete Spark local dirs" >>> java.lang.NullPointerException >>> at org.apache.spark.storage.DiskBlockManager.org >>> $apache$spark$storage$DiskBlockManager$$doStop(DiskBlockManager.scala:161) >>> at >>> org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$1.apply$mcV$sp(DiskBlockManager.scala:141) >>> at >>> org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$1.apply(DiskBlockManager.scala:139) >>> at >>> org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$1.apply(DiskBlockManager.scala:139) >>> at >>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1617) >>> at >>> org.apache.spark.storage.DiskBlockManager$$anon$1.run(DiskBlockManager.scala:139) >>> >>> >>> >>> >> >