Well SPARK_CLASSPATH it's just a random name, the complete script is this:

export HADOOP_CONF_DIR=/etc/hadoop/conf
SPARK_CLASSPATH="file:/usr/metrics/conf/elasticSearch.properties,file:/usr/metrics/conf/redis.properties,/etc/spark/conf.cloudera.spark_on_yarn/yarn-conf/"
for lib in `ls /usr/metrics/lib/*.jar`
do
        if [ -z "$SPARK_CLASSPATH" ]; then
SPARK_CLASSPATH=$lib
else
SPARK_CLASSPATH=$SPARK_CLASSPATH,$lib
fi
done
spark-submit --name "Metrics"....

I need to add all the jars as you know,, maybe it was a bad name
SPARK_CLASSPATH

The code doesn't have any stateful operation, yo I guess that it¡s okay
doesn't have checkpoint. I have executed hundres of times thiscode in VM
from Cloudera and never got this error.

2015-06-27 11:21 GMT+02:00 Tathagata Das <t...@databricks.com>:

> 1. you need checkpointing mostly for recovering from driver failures, and
> in some cases also for some stateful operations.
>
> 2. Could you try not using the SPARK_CLASSPATH environment variable.
>
> TD
>
> On Sat, Jun 27, 2015 at 1:00 AM, Guillermo Ortiz <konstt2...@gmail.com>
> wrote:
>
>> I don't have any checkpoint on my code. Really, I don't have to save any
>> state. It's just a log processing of a PoC.
>> I have been testing the code in a VM from Cloudera and I never got that
>> error.. Not it's a real cluster.
>>
>> The command to execute Spark
>> spark-submit --name "PoC Logs" --master yarn-client --class
>> com.metrics.MetricsSpark --jars $SPARK_CLASSPATH --executor-memory 1g
>> /usr/metrics/ex/metrics-spark.jar $1 $2 $3
>>
>>     val sparkConf = new SparkConf()
>>     val ssc = new StreamingContext(sparkConf, Seconds(5))
>>     val kafkaParams = Map[String, String]("metadata.broker.list" ->
>> args(0))
>>     val topics = args(1).split("\\,")
>>     val directKafkaStream = KafkaUtils.createDirectStream[String, String,
>> StringDecoder, StringDecoder](ssc, kafkaParams, topics.toSet)
>>
>>     directKafkaStream.foreachRDD { rdd =>
>>       val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>>       val documents = rdd.mapPartitionsWithIndex { (i, kafkaEvent) =>
>>       .....
>>    }
>>
>> I understand that I just need a checkpoint if I need to recover the task
>> it something goes wrong, right?
>>
>>
>> 2015-06-27 9:39 GMT+02:00 Tathagata Das <t...@databricks.com>:
>>
>>> How are you trying to execute the code again? From checkpoints, or
>>> otherwise?
>>> Also cc'ed Hari who may have a better idea of YARN related issues.
>>>
>>> On Sat, Jun 27, 2015 at 12:35 AM, Guillermo Ortiz <konstt2...@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> I'm executing a SparkStreamig code with Kafka. IçThe code was working
>>>> but today I tried to execute the code again and I got an exception, I dn't
>>>> know what's it happening. right now , there are no jobs executions on YARN.
>>>> How could it fix it?
>>>>
>>>> Exception in thread "main" org.apache.spark.SparkException: Yarn
>>>> application has already ended! It might have been killed or unable to
>>>> launch application master.
>>>>         at
>>>> org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.waitForApplication(YarnClientSchedulerBackend.scala:113)
>>>>         at
>>>> org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:59)
>>>>         at
>>>> org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:141)
>>>>         at org.apache.spark.SparkContext.<init>(SparkContext.scala:379)
>>>>         at
>>>> org.apache.spark.streaming.StreamingContext$.createNewSparkContext(StreamingContext.scala:642)
>>>>         at
>>>> org.apache.spark.streaming.StreamingContext.<init>(StreamingContext.scala:75)
>>>>         at
>>>> com.produban.metrics.MetricsTransfInternationalSpark$.main(MetricsTransfInternationalSpark.scala:66)
>>>>         at
>>>> com.produban.metrics.MetricsTransfInternationalSpark.main(MetricsTransfInternationalSpark.scala)
>>>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>         at
>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>>>         at
>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>         at java.lang.reflect.Method.invoke(Method.java:606)
>>>>         at
>>>> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)
>>>>         at
>>>> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
>>>>         at
>>>> org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
>>>>         at
>>>> org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
>>>>         at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>>> *15/06/27 09:27:09 ERROR Utils: Uncaught exception in thread delete
>>>> Spark local dirs*
>>>> java.lang.NullPointerException
>>>>         at org.apache.spark.storage.DiskBlockManager.org
>>>> $apache$spark$storage$DiskBlockManager$$doStop(DiskBlockManager.scala:161)
>>>>         at
>>>> org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$1.apply$mcV$sp(DiskBlockManager.scala:141)
>>>>         at
>>>> org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$1.apply(DiskBlockManager.scala:139)
>>>>         at
>>>> org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$1.apply(DiskBlockManager.scala:139)
>>>>         at
>>>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1617)
>>>>         at
>>>> org.apache.spark.storage.DiskBlockManager$$anon$1.run(DiskBlockManager.scala:139)
>>>> Exception in thread "delete Spark local dirs"
>>>> java.lang.NullPointerException
>>>>         at org.apache.spark.storage.DiskBlockManager.org
>>>> $apache$spark$storage$DiskBlockManager$$doStop(DiskBlockManager.scala:161)
>>>>         at
>>>> org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$1.apply$mcV$sp(DiskBlockManager.scala:141)
>>>>         at
>>>> org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$1.apply(DiskBlockManager.scala:139)
>>>>         at
>>>> org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$1.apply(DiskBlockManager.scala:139)
>>>>         at
>>>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1617)
>>>>         at
>>>> org.apache.spark.storage.DiskBlockManager$$anon$1.run(DiskBlockManager.scala:139)
>>>>
>>>>
>>>>
>>>>
>>>
>>
>

Reply via email to