Crash again. On the driver, logs say: 14/08/29 19:04:55 INFO BlockManagerMaster: Removed 7 successfully in removeExecutor org.apache.spark.SparkException: Job aborted due to stage failure: Task 2.0:0 failed 4 times, most recent failure: TID 6383 on host node-dn1-2-acme.com failed for unknown reason Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
I go look at OS on node-dn1-2 and container logs for TID6383 but find nothing. # grep 6383 stderr 14/08/29 18:52:51 INFO CoarseGrainedExecutorBackend: Got assigned task 6383 14/08/29 18:52:51 INFO Executor: Running task ID 6383 However, last message on the container is timestamped "19:04:51" that tells me the executor was killed for some reason right before the driver noticed that executor/task failure. How come my task failed only after 4 times although my config says failure threshold is 64? On Fri, Aug 29, 2014 at 12:00 PM, Tim Smith <secs...@gmail.com> wrote: > I wrote a long post about how I arrived here but in a nutshell I don't see > evidence of re-partitioning and workload distribution across the cluster. > My new fangled way of starting the job is: > > run=`date +"%m-%d-%YT%T"`; \ > nohup spark-submit --class logStreamNormalizer \ > --master yarn log-stream-normalizer_2.10-1.0.jar \ > --jars > spark-streaming-kafka_2.10-1.0.0.jar,kafka_2.10-0.8.1.1.jar,zkclient-0.3.jar,metrics-core-2.2.0.jar,json4s-jackson_2.10-3.2.10.jar > \ > --driver-memory 8G \ > --executor-memory 30G \ > --executor-cores 16 \ > --num-executors 8 \ > --spark.serializer org.apache.spark.serializer.KryoSerializer \ > --spark.rdd.compress true \ > --spark.io.compression.codec org.apache.spark.io.SnappyCompressionCodec \ > --spark.akka.threads 16 \ > --spark.task.maxFailures 64 \ > --spark.scheduler.mode FAIR \ > >logs/normRunLog-$run.log \ > 2>logs/normRunLogError-$run.log & \ > echo $! > logs/run-$run.pid > > Since the job spits out lots of logs, here is how I am trying to determine > if any tasks got assigned to non-local executors. > $ grep TID logs/normRunLogError-08-29-2014T18\:28\:32.log | grep Starting > | grep -v NODE_LOCAL | grep -v PROCESS_LOCAL > > Yields no lines. > > If I look at resource pool usage in YARN, this app is assigned 252.5GB of > memory, 128 VCores and 9 containers. Am I missing something here? > > Thanks, > > Tim > > > > > > > > On Thu, Aug 28, 2014 at 11:55 PM, Tim Smith <secs...@gmail.com> wrote: > >> I set partitions to 64: >> >> // >> kInMsg.repartition(64) >> val outdata = kInMsg.map(x=>normalizeLog(x._2,configMap)) >> // >> >> Still see all activity only on the two nodes that seem to be receiving >> from Kafka. >> >> On Thu, Aug 28, 2014 at 5:47 PM, Tim Smith <secs...@gmail.com> wrote: >> > TD - Apologies, didn't realize I was replying to you instead of the >> list. >> > >> > What does "numPartitions" refer to when calling createStream? I read an >> > earlier thread that seemed to suggest that numPartitions translates to >> > partitions created on the Spark side? >> > >> http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201407.mbox/%3ccaph-c_o04j3njqjhng5ho281mqifnf3k_r6coqxpqh5bh6a...@mail.gmail.com%3E >> > >> > Actually, I re-tried with 64 numPartitions in createStream and that >> didn't >> > work. I will manually set "repartition" to 64/128 and see how that goes. >> > >> > Thanks. >> > >> > >> > >> > >> > On Thu, Aug 28, 2014 at 5:42 PM, Tathagata Das < >> tathagata.das1...@gmail.com> >> > wrote: >> >> >> >> Having 16 partitions in KafkaUtils.createStream does not translate to >> the >> >> RDDs in Spark / Spark Streaming having 16 partitions. Repartition is >> the >> >> best way to distribute the received data between all the nodes, as >> long as >> >> there are sufficient number of partitions (try setting it to 2x the >> number >> >> cores given to the application). >> >> >> >> Yeah, in 1.0.0, ttl should be unnecessary. >> >> >> >> >> >> >> >> On Thu, Aug 28, 2014 at 5:17 PM, Tim Smith <secs...@gmail.com> wrote: >> >>> >> >>> On Thu, Aug 28, 2014 at 4:19 PM, Tathagata Das >> >>> <tathagata.das1...@gmail.com> wrote: >> >>>> >> >>>> If you are repartitioning to 8 partitions, and your node happen to >> have >> >>>> at least 4 cores each, its possible that all 8 partitions are >> assigned to >> >>>> only 2 nodes. Try increasing the number of partitions. Also make >> sure you >> >>>> have executors (allocated by YARN) running on more than two nodes if >> you >> >>>> want to use all 11 nodes in your yarn cluster. >> >>> >> >>> >> >>> If you look at the code, I commented out the manual re-partitioning >> to 8. >> >>> Instead, I am created 16 partitions when I call createStream. But I >> will >> >>> increase the partitions to, say, 64 and see if I get better >> parallelism. >> >>> >> >>>> >> >>>> >> >>>> If you are using Spark 1.x, then you dont need to set the ttl for >> >>>> running Spark Streaming. In case you are using older version, why do >> you >> >>>> want to reduce it? You could reduce it, but it does increase the >> risk of the >> >>>> premature cleaning, if once in a while things get delayed by 20 >> seconds. I >> >>>> dont see much harm in keeping the ttl at 60 seconds (a bit of extra >> garbage >> >>>> shouldnt hurt performance). >> >>>> >> >>> >> >>> I am running 1.0.0 (CDH5) so ttl setting is redundant? But you are >> right, >> >>> unless I have memory issues, more aggressive pruning won't help. >> >>> >> >>> Thanks, >> >>> >> >>> Tim >> >>> >> >>> >> >>> >> >>>> >> >>>> TD >> >>>> >> >>>> >> >>>> On Thu, Aug 28, 2014 at 3:16 PM, Tim Smith <secs...@gmail.com> >> wrote: >> >>>>> >> >>>>> Hi, >> >>>>> >> >>>>> In my streaming app, I receive from kafka where I have tried setting >> >>>>> the partitions when calling "createStream" or later, by calling >> repartition >> >>>>> - in both cases, the number of nodes running the tasks seems to be >> >>>>> stubbornly stuck at 2. Since I have 11 nodes in my cluster, I was >> hoping to >> >>>>> use more nodes. >> >>>>> >> >>>>> I am starting the job as: >> >>>>> nohup spark-submit --class logStreamNormalizer --master yarn >> >>>>> log-stream-normalizer_2.10-1.0.jar --jars >> >>>>> >> spark-streaming-kafka_2.10-1.0.0.jar,kafka_2.10-0.8.1.1.jar,zkclient-0.3.jar,metrics-core-2.2.0.jar,json4s-jackson_2.10-3.2.10.jar >> >>>>> --executor-memory 30G --spark.cleaner.ttl 60 --executor-cores 8 >> >>>>> --num-executors 8 >normRunLog-6.log 2>normRunLogError-6.log & echo >> $! > >> >>>>> run-6.pid >> >>>>> >> >>>>> My main code is: >> >>>>> val sparkConf = new SparkConf().setAppName("SparkKafkaTest") >> >>>>> val ssc = new StreamingContext(sparkConf,Seconds(5)) >> >>>>> val kInMsg = >> >>>>> >> KafkaUtils.createStream(ssc,"node-nn1-1:2181/zk_kafka","normApp",Map("rawunstruct" >> >>>>> -> 16)) >> >>>>> >> >>>>> val propsMap = Map("metadata.broker.list" -> >> >>>>> "node-dn1-6:9092,node-dn1-7:9092,node-dn1-8:9092", >> "serializer.class" -> >> >>>>> "kafka.serializer.StringEncoder", "producer.type" -> "async", >> >>>>> "request.required.acks" -> "1") >> >>>>> val to_topic = """normStruct""" >> >>>>> val writer = new KafkaOutputService(to_topic, propsMap) >> >>>>> >> >>>>> >> >>>>> if (!configMap.keySet.isEmpty) >> >>>>> { >> >>>>> //kInMsg.repartition(8) >> >>>>> val outdata = kInMsg.map(x=>normalizeLog(x._2,configMap)) >> >>>>> outdata.foreachRDD((rdd,time) => { rdd.foreach(rec => { >> >>>>> writer.output(rec) }) } ) >> >>>>> } >> >>>>> >> >>>>> ssc.start() >> >>>>> ssc.awaitTermination() >> >>>>> >> >>>>> In terms of total delay, with a 5 second batch, the delays usually >> stay >> >>>>> under 5 seconds, but sometimes jump to ~10 seconds. As a >> performance tuning >> >>>>> question, does this mean, I can reduce my cleaner ttl from 60 to >> say 25 >> >>>>> (still more than double of the peak delay)? >> >>>>> >> >>>>> Thanks >> >>>>> >> >>>>> Tim >> >>>>> >> >>>> >> >>> >> >> >> > >> > >