Well SPARK_CLASSPATH it's just a random name, the complete script is this: export HADOOP_CONF_DIR=/etc/hadoop/conf SPARK_CLASSPATH="file:/usr/metrics/conf/elasticSearch.properties,file:/usr/metrics/conf/redis.properties,/etc/spark/conf.cloudera.spark_on_yarn/yarn-conf/" for lib in `ls /usr/metrics/lib/*.jar` do if [ -z "$SPARK_CLASSPATH" ]; then SPARK_CLASSPATH=$lib else SPARK_CLASSPATH=$SPARK_CLASSPATH,$lib fi done spark-submit --name "Metrics"....
I need to add all the jars as you know,, maybe it was a bad name SPARK_CLASSPATH The code doesn't have any stateful operation, yo I guess that it¡s okay doesn't have checkpoint. I have executed hundres of times thiscode in VM from Cloudera and never got this error. 2015-06-27 11:21 GMT+02:00 Tathagata Das <t...@databricks.com>: > 1. you need checkpointing mostly for recovering from driver failures, and > in some cases also for some stateful operations. > > 2. Could you try not using the SPARK_CLASSPATH environment variable. > > TD > > On Sat, Jun 27, 2015 at 1:00 AM, Guillermo Ortiz <konstt2...@gmail.com> > wrote: > >> I don't have any checkpoint on my code. Really, I don't have to save any >> state. It's just a log processing of a PoC. >> I have been testing the code in a VM from Cloudera and I never got that >> error.. Not it's a real cluster. >> >> The command to execute Spark >> spark-submit --name "PoC Logs" --master yarn-client --class >> com.metrics.MetricsSpark --jars $SPARK_CLASSPATH --executor-memory 1g >> /usr/metrics/ex/metrics-spark.jar $1 $2 $3 >> >> val sparkConf = new SparkConf() >> val ssc = new StreamingContext(sparkConf, Seconds(5)) >> val kafkaParams = Map[String, String]("metadata.broker.list" -> >> args(0)) >> val topics = args(1).split("\\,") >> val directKafkaStream = KafkaUtils.createDirectStream[String, String, >> StringDecoder, StringDecoder](ssc, kafkaParams, topics.toSet) >> >> directKafkaStream.foreachRDD { rdd => >> val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges >> val documents = rdd.mapPartitionsWithIndex { (i, kafkaEvent) => >> ..... >> } >> >> I understand that I just need a checkpoint if I need to recover the task >> it something goes wrong, right? >> >> >> 2015-06-27 9:39 GMT+02:00 Tathagata Das <t...@databricks.com>: >> >>> How are you trying to execute the code again? From checkpoints, or >>> otherwise? >>> Also cc'ed Hari who may have a better idea of YARN related issues. >>> >>> On Sat, Jun 27, 2015 at 12:35 AM, Guillermo Ortiz <konstt2...@gmail.com> >>> wrote: >>> >>>> Hi, >>>> >>>> I'm executing a SparkStreamig code with Kafka. IçThe code was working >>>> but today I tried to execute the code again and I got an exception, I dn't >>>> know what's it happening. right now , there are no jobs executions on YARN. >>>> How could it fix it? >>>> >>>> Exception in thread "main" org.apache.spark.SparkException: Yarn >>>> application has already ended! It might have been killed or unable to >>>> launch application master. >>>> at >>>> org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.waitForApplication(YarnClientSchedulerBackend.scala:113) >>>> at >>>> org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:59) >>>> at >>>> org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:141) >>>> at org.apache.spark.SparkContext.<init>(SparkContext.scala:379) >>>> at >>>> org.apache.spark.streaming.StreamingContext$.createNewSparkContext(StreamingContext.scala:642) >>>> at >>>> org.apache.spark.streaming.StreamingContext.<init>(StreamingContext.scala:75) >>>> at >>>> com.produban.metrics.MetricsTransfInternationalSpark$.main(MetricsTransfInternationalSpark.scala:66) >>>> at >>>> com.produban.metrics.MetricsTransfInternationalSpark.main(MetricsTransfInternationalSpark.scala) >>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>> at >>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) >>>> at >>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>> at java.lang.reflect.Method.invoke(Method.java:606) >>>> at >>>> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569) >>>> at >>>> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166) >>>> at >>>> org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189) >>>> at >>>> org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110) >>>> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) >>>> *15/06/27 09:27:09 ERROR Utils: Uncaught exception in thread delete >>>> Spark local dirs* >>>> java.lang.NullPointerException >>>> at org.apache.spark.storage.DiskBlockManager.org >>>> $apache$spark$storage$DiskBlockManager$$doStop(DiskBlockManager.scala:161) >>>> at >>>> org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$1.apply$mcV$sp(DiskBlockManager.scala:141) >>>> at >>>> org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$1.apply(DiskBlockManager.scala:139) >>>> at >>>> org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$1.apply(DiskBlockManager.scala:139) >>>> at >>>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1617) >>>> at >>>> org.apache.spark.storage.DiskBlockManager$$anon$1.run(DiskBlockManager.scala:139) >>>> Exception in thread "delete Spark local dirs" >>>> java.lang.NullPointerException >>>> at org.apache.spark.storage.DiskBlockManager.org >>>> $apache$spark$storage$DiskBlockManager$$doStop(DiskBlockManager.scala:161) >>>> at >>>> org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$1.apply$mcV$sp(DiskBlockManager.scala:141) >>>> at >>>> org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$1.apply(DiskBlockManager.scala:139) >>>> at >>>> org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$1.apply(DiskBlockManager.scala:139) >>>> at >>>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1617) >>>> at >>>> org.apache.spark.storage.DiskBlockManager$$anon$1.run(DiskBlockManager.scala:139) >>>> >>>> >>>> >>>> >>> >> >