Crash again. On the driver, logs say:
14/08/29 19:04:55 INFO BlockManagerMaster: Removed 7 successfully in
removeExecutor
org.apache.spark.SparkException: Job aborted due to stage failure: Task
2.0:0 failed 4 times, most recent failure: TID 6383 on host
node-dn1-2-acme.com failed for unknown reason
Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
        at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
        at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
        at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
        at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
        at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
        at scala.Option.foreach(Option.scala:236)
        at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633)
        at
org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
        at akka.actor.ActorCell.invoke(ActorCell.scala:456)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
        at akka.dispatch.Mailbox.run(Mailbox.scala:219)
        at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
        at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


I go look at OS on node-dn1-2 and container logs for TID6383 but find
nothing.
# grep 6383 stderr
14/08/29 18:52:51 INFO CoarseGrainedExecutorBackend: Got assigned task 6383
14/08/29 18:52:51 INFO Executor: Running task ID 6383

However, last message on the container is timestamped "19:04:51" that tells
me the executor was killed for some reason right before the driver noticed
that executor/task failure.

How come my task failed only after 4 times although my config says failure
threshold is 64?








On Fri, Aug 29, 2014 at 12:00 PM, Tim Smith <secs...@gmail.com> wrote:

> I wrote a long post about how I arrived here but in a nutshell I don't see
> evidence of re-partitioning and workload distribution across the cluster.
> My new fangled way of starting the job is:
>
> run=`date +"%m-%d-%YT%T"`; \
> nohup spark-submit --class logStreamNormalizer \
> --master yarn log-stream-normalizer_2.10-1.0.jar \
> --jars
> spark-streaming-kafka_2.10-1.0.0.jar,kafka_2.10-0.8.1.1.jar,zkclient-0.3.jar,metrics-core-2.2.0.jar,json4s-jackson_2.10-3.2.10.jar
> \
> --driver-memory 8G \
> --executor-memory 30G \
> --executor-cores 16 \
> --num-executors 8 \
> --spark.serializer org.apache.spark.serializer.KryoSerializer \
> --spark.rdd.compress true \
> --spark.io.compression.codec org.apache.spark.io.SnappyCompressionCodec \
> --spark.akka.threads 16 \
> --spark.task.maxFailures 64 \
> --spark.scheduler.mode FAIR \
> >logs/normRunLog-$run.log \
> 2>logs/normRunLogError-$run.log & \
> echo $! > logs/run-$run.pid
>
> Since the job spits out lots of logs, here is how I am trying to determine
> if any tasks got assigned to non-local executors.
> $ grep TID logs/normRunLogError-08-29-2014T18\:28\:32.log  | grep Starting
> | grep -v NODE_LOCAL | grep -v PROCESS_LOCAL
>
> Yields no lines.
>
> If I look at resource pool usage in YARN, this app is assigned 252.5GB of
> memory, 128 VCores and 9 containers. Am I missing something here?
>
> Thanks,
>
> Tim
>
>
>
>
>
>
>
> On Thu, Aug 28, 2014 at 11:55 PM, Tim Smith <secs...@gmail.com> wrote:
>
>> I set partitions to 64:
>>
>> //
>>  kInMsg.repartition(64)
>>  val outdata = kInMsg.map(x=>normalizeLog(x._2,configMap))
>> //
>>
>> Still see all activity only on the two nodes that seem to be receiving
>> from Kafka.
>>
>> On Thu, Aug 28, 2014 at 5:47 PM, Tim Smith <secs...@gmail.com> wrote:
>> > TD - Apologies, didn't realize I was replying to you instead of the
>> list.
>> >
>> > What does "numPartitions" refer to when calling createStream? I read an
>> > earlier thread that seemed to suggest that numPartitions translates to
>> > partitions created on the Spark side?
>> >
>> http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201407.mbox/%3ccaph-c_o04j3njqjhng5ho281mqifnf3k_r6coqxpqh5bh6a...@mail.gmail.com%3E
>> >
>> > Actually, I re-tried with 64 numPartitions in createStream and that
>> didn't
>> > work. I will manually set "repartition" to 64/128 and see how that goes.
>> >
>> > Thanks.
>> >
>> >
>> >
>> >
>> > On Thu, Aug 28, 2014 at 5:42 PM, Tathagata Das <
>> tathagata.das1...@gmail.com>
>> > wrote:
>> >>
>> >> Having 16 partitions in KafkaUtils.createStream does not translate to
>> the
>> >> RDDs in Spark / Spark Streaming having 16 partitions. Repartition is
>> the
>> >> best way to distribute the received data between all the nodes, as
>> long as
>> >> there are sufficient number of partitions (try setting it to 2x the
>> number
>> >> cores given to the application).
>> >>
>> >> Yeah, in 1.0.0, ttl should be unnecessary.
>> >>
>> >>
>> >>
>> >> On Thu, Aug 28, 2014 at 5:17 PM, Tim Smith <secs...@gmail.com> wrote:
>> >>>
>> >>> On Thu, Aug 28, 2014 at 4:19 PM, Tathagata Das
>> >>> <tathagata.das1...@gmail.com> wrote:
>> >>>>
>> >>>> If you are repartitioning to 8 partitions, and your node happen to
>> have
>> >>>> at least 4 cores each, its possible that all 8 partitions are
>> assigned to
>> >>>> only 2 nodes. Try increasing the number of partitions. Also make
>> sure you
>> >>>> have executors (allocated by YARN) running on more than two nodes if
>> you
>> >>>> want to use all 11 nodes in your yarn cluster.
>> >>>
>> >>>
>> >>> If you look at the code, I commented out the manual re-partitioning
>> to 8.
>> >>> Instead, I am created 16 partitions when I call createStream. But I
>> will
>> >>> increase the partitions to, say, 64 and see if I get better
>> parallelism.
>> >>>
>> >>>>
>> >>>>
>> >>>> If you are using Spark 1.x, then you dont need to set the ttl for
>> >>>> running Spark Streaming. In case you are using older version, why do
>> you
>> >>>> want to reduce it? You could reduce it, but it does increase the
>> risk of the
>> >>>> premature cleaning, if once in a while things get delayed by 20
>> seconds. I
>> >>>> dont see much harm in keeping the ttl at 60 seconds (a bit of extra
>> garbage
>> >>>> shouldnt hurt performance).
>> >>>>
>> >>>
>> >>> I am running 1.0.0 (CDH5) so ttl setting is redundant? But you are
>> right,
>> >>> unless I have memory issues, more aggressive pruning won't help.
>> >>>
>> >>> Thanks,
>> >>>
>> >>> Tim
>> >>>
>> >>>
>> >>>
>> >>>>
>> >>>> TD
>> >>>>
>> >>>>
>> >>>> On Thu, Aug 28, 2014 at 3:16 PM, Tim Smith <secs...@gmail.com>
>> wrote:
>> >>>>>
>> >>>>> Hi,
>> >>>>>
>> >>>>> In my streaming app, I receive from kafka where I have tried setting
>> >>>>> the partitions when calling "createStream" or later, by calling
>> repartition
>> >>>>> - in both cases, the number of nodes running the tasks seems to be
>> >>>>> stubbornly stuck at 2. Since I have 11 nodes in my cluster, I was
>> hoping to
>> >>>>> use more nodes.
>> >>>>>
>> >>>>> I am starting the job as:
>> >>>>> nohup spark-submit --class logStreamNormalizer --master yarn
>> >>>>> log-stream-normalizer_2.10-1.0.jar --jars
>> >>>>>
>> spark-streaming-kafka_2.10-1.0.0.jar,kafka_2.10-0.8.1.1.jar,zkclient-0.3.jar,metrics-core-2.2.0.jar,json4s-jackson_2.10-3.2.10.jar
>> >>>>> --executor-memory 30G --spark.cleaner.ttl 60 --executor-cores 8
>> >>>>> --num-executors 8 >normRunLog-6.log 2>normRunLogError-6.log & echo
>> $! >
>> >>>>> run-6.pid
>> >>>>>
>> >>>>> My main code is:
>> >>>>>  val sparkConf = new SparkConf().setAppName("SparkKafkaTest")
>> >>>>>  val ssc = new StreamingContext(sparkConf,Seconds(5))
>> >>>>>  val kInMsg =
>> >>>>>
>> KafkaUtils.createStream(ssc,"node-nn1-1:2181/zk_kafka","normApp",Map("rawunstruct"
>> >>>>> -> 16))
>> >>>>>
>> >>>>>  val propsMap = Map("metadata.broker.list" ->
>> >>>>> "node-dn1-6:9092,node-dn1-7:9092,node-dn1-8:9092",
>> "serializer.class" ->
>> >>>>> "kafka.serializer.StringEncoder", "producer.type" -> "async",
>> >>>>> "request.required.acks" -> "1")
>> >>>>>  val to_topic = """normStruct"""
>> >>>>>  val writer = new KafkaOutputService(to_topic, propsMap)
>> >>>>>
>> >>>>>
>> >>>>>  if (!configMap.keySet.isEmpty)
>> >>>>>  {
>> >>>>>   //kInMsg.repartition(8)
>> >>>>>   val outdata = kInMsg.map(x=>normalizeLog(x._2,configMap))
>> >>>>>   outdata.foreachRDD((rdd,time) => { rdd.foreach(rec => {
>> >>>>> writer.output(rec) }) } )
>> >>>>>  }
>> >>>>>
>> >>>>>  ssc.start()
>> >>>>>  ssc.awaitTermination()
>> >>>>>
>> >>>>> In terms of total delay, with a 5 second batch, the delays usually
>> stay
>> >>>>> under 5 seconds, but sometimes jump to ~10 seconds. As a
>> performance tuning
>> >>>>> question, does this mean, I can reduce my cleaner ttl from 60 to
>> say 25
>> >>>>> (still more than double of the peak delay)?
>> >>>>>
>> >>>>> Thanks
>> >>>>>
>> >>>>> Tim
>> >>>>>
>> >>>>
>> >>>
>> >>
>> >
>>
>
>

Reply via email to