Try to add the following to the sparkConf .set("spark.core.connection.ack.wait.timeout","6000")
.set("spark.akka.frameSize","60") Used to face that issue with spark 1.1.0 Thanks Best Regards On Thu, Dec 11, 2014 at 2:14 AM, Flávio Santos <bar...@chaordicsystems.com> wrote: > Dear Spark'ers, > > I'm trying to run a simple job using Spark Streaming (version 1.1.1) and > YARN (yarn-cluster), but unfortunately I'm facing many issues. In short, my > job does the following: > - Consumes a specific Kafka topic > - Writes its content to S3 or HDFS > > Records in Kafka are in the form: > {"key": "someString"} > > This is important because I use the value of "key" to define the output > file name in S3. > Here are the Spark and Kafka parameters I'm using: > > val sparkConf = new SparkConf() >> .setAppName("MyDumperApp") >> .set("spark.task.maxFailures", "100") >> .set("spark.hadoop.validateOutputSpecs", "false") >> .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") >> .set("spark.executor.extraJavaOptions", "-XX:+UseCompressedOops") >> val kafkaParams = Map( >> "zookeeper.connect" -> zkQuorum, >> "zookeeper.session.timeout.ms" -> "10000", >> "rebalance.backoff.ms" -> "8000", >> "rebalance.max.retries" -> "10", >> "group.id" -> group, >> "auto.offset.reset" -> "largest" >> ) > > > My application is the following: > > KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, >> kafkaParams, Map(topic -> 1), StorageLevel.MEMORY_AND_DISK_SER_2) >> .foreachRDD((rdd, time) => >> rdd.map { >> case (_, line) => >> val json = parse(line) >> val key = extract(json, "key").getOrElse("key_not_found") >> (key, dateFormatter.format(time.milliseconds)) -> line >> } >> .partitionBy(new HashPartitioner(10)) >> .saveAsHadoopFile[KeyBasedOutput[(String,String), >> String]]("s3://BUCKET", classOf[BZip2Codec]) >> ) > > > And the last piece: > > class KeyBasedOutput[T >: Null, V <: AnyRef] extends >> MultipleTextOutputFormat[T , V] { >> override protected def generateFileNameForKeyValue(key: T, value: V, >> leaf: String) = key match { >> case (myKey, batchId) => >> "somedir" + "/" + myKey + "/" + >> "prefix-" + myKey + "_" + batchId + "_" + leaf >> } >> override protected def generateActualKey(key: T, value: V) = null >> } > > > I use batch sizes of 5 minutes with checkpoints activated. > The job fails nondeterministically (I think it never ran longer than ~5 > hours). I have no clue why, it simply fails. > Please find below the exceptions thrown by my application. > > I really appreciate any kind of hint. > Thank you very much in advance. > > Regards, > -- Flávio > > ==== Executor 1 > > 2014-12-10 19:05:15,150 INFO [handle-read-write-executor-3] > network.ConnectionManager (Logging.scala:logInfo(59)) - Removing > SendingConnection > to ConnectionManagerId(ec2-DRIVER.compute-1.amazonaws.com,49163) > 2014-12-10 19:05:15,201 INFO [Thread-6] storage.MemoryStore > (Logging.scala:logInfo(59)) - ensureFreeSpace(98665) called with > curMem=194463488, > maxMem=4445479895 > 2014-12-10 19:05:15,202 INFO [Thread-6] storage.MemoryStore > (Logging.scala:logInfo(59)) - Block input-0-1418238315000 stored as bytes > in memor > y (estimated size 96.4 KB, free 4.0 GB) > 2014-12-10 19:05:16,506 INFO [handle-read-write-executor-2] > network.ConnectionManager (Logging.scala:logInfo(59)) - Removing > ReceivingConnecti > on to ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,51443) > 2014-12-10 19:05:16,506 INFO [handle-read-write-executor-1] > network.ConnectionManager (Logging.scala:logInfo(59)) - Removing > SendingConnection > to ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,51443) > 2014-12-10 19:05:16,506 INFO [handle-read-write-executor-2] > network.ConnectionManager (Logging.scala:logInfo(59)) - Removing > SendingConnection > to ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,51443) > 2014-12-10 19:05:16,649 INFO [handle-read-write-executor-0] > network.ConnectionManager (Logging.scala:logInfo(59)) - Removing > ReceivingConnecti > on to ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,39644) > 2014-12-10 19:05:16,649 INFO [handle-read-write-executor-3] > network.ConnectionManager (Logging.scala:logInfo(59)) - Removing > SendingConnection > to ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,39644) > 2014-12-10 19:05:16,649 INFO [handle-read-write-executor-0] > network.ConnectionManager (Logging.scala:logInfo(59)) - Removing > SendingConnection > to ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,39644) > 2014-12-10 19:05:16,650 INFO [connection-manager-thread] > network.ConnectionManager (Logging.scala:logInfo(59)) - Key not valid ? > sun.nio.ch.Se > lectionKeyImpl@da2e041 > 2014-12-10 19:05:16,651 INFO [connection-manager-thread] > network.ConnectionManager (Logging.scala:logInfo(80)) - key already > cancelled ? sun.n > io.ch.SelectionKeyImpl@da2e041 > *java.nio.channels.CancelledKeyException* > at > org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:316) > at > org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:145) > 2014-12-10 19:05:16,679 INFO [handle-read-write-executor-1] > network.ConnectionManager (Logging.scala:logInfo(59)) - Removing > ReceivingConnection to ConnectionManagerId( > ec2-EXECUTOR.compute-1.amazonaws.com,39444) > 2014-12-10 19:05:16,679 INFO [handle-read-write-executor-2] > network.ConnectionManager (Logging.scala:logInfo(59)) - Removing > SendingConnection to ConnectionManagerId( > ec2-EXECUTOR.compute-1.amazonaws.com,39444) > 2014-12-10 19:05:16,679 INFO [handle-read-write-executor-1] > network.ConnectionManager (Logging.scala:logInfo(59)) - Removing > SendingConnection to ConnectionManagerId( > ec2-EXECUTOR.compute-1.amazonaws.com,39444) > 2014-12-10 19:05:16,680 INFO [connection-manager-thread] > network.ConnectionManager (Logging.scala:logInfo(59)) - Key not valid ? > sun.nio.ch.SelectionKeyImpl@6a0dd98a > 2014-12-10 19:05:16,681 INFO [connection-manager-thread] > network.ConnectionManager (Logging.scala:logInfo(80)) - key already > cancelled ? sun.nio.ch.SelectionKeyImpl@6a0dd98a > *java.nio.channels.CancelledKeyException* > at > org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:392) > at > org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:145) > 2014-12-10 19:05:16,717 INFO [handle-read-write-executor-3] > network.ConnectionManager (Logging.scala:logInfo(59)) - Removing > ReceivingConnection to ConnectionManagerId( > ec2-EXECUTOR.compute-1.amazonaws.com,57984) > 2014-12-10 19:05:16,717 INFO [handle-read-write-executor-0] > network.ConnectionManager (Logging.scala:logInfo(59)) - Removing > SendingConnection to ConnectionManagerId( > ec2-EXECUTOR.compute-1.amazonaws.com,57984) > 2014-12-10 19:05:16,717 INFO [handle-read-write-executor-3] > network.ConnectionManager (Logging.scala:logInfo(59)) - Removing > SendingConnection to ConnectionManagerId( > ec2-EXECUTOR.compute-1.amazonaws.com,57984) > 2014-12-10 19:05:17,182 ERROR [SIGTERM handler] > executor.CoarseGrainedExecutorBackend (SignalLogger.scala:handle(57)) - > RECEIVED SIGNAL 15: SIGTERM > > ==== Executor 2 > > 2014-12-10 19:05:15,010 INFO [handle-message-executor-11] > storage.BlockManagerMaster (Logging.scala:logInfo(59)) - Updated info of > block input > -0-1418238314800 > 2014-12-10 19:05:15,157 INFO [connection-manager-thread] > network.ConnectionManager (Logging.scala:logInfo(59)) - Key not valid ? > sun.nio.ch.Se > lectionKeyImpl@66ea19c > 2014-12-10 19:05:15,157 INFO [handle-read-write-executor-2] > network.ConnectionManager (Logging.scala:logInfo(59)) - Removing > SendingConnection > to ConnectionManagerId(ec2-DRIVER.compute-1.amazonaws.com,49163) > 2014-12-10 19:05:15,157 INFO [handle-read-write-executor-0] > network.ConnectionManager (Logging.scala:logInfo(59)) - Removing > ReceivingConnecti > on to ConnectionManagerId(ec2-DRIVER.compute-1.amazonaws.com,49163) > 2014-12-10 19:05:15,158 ERROR [handle-read-write-executor-0] > network.ConnectionManager (Logging.scala:logError(75)) - Corresponding > SendingConn > ection to ConnectionManagerId(ec2-DRIVER.compute-1.amazonaws.com,49163) > not found > 2014-12-10 19:05:15,158 INFO [connection-manager-thread] > network.ConnectionManager (Logging.scala:logInfo(80)) - key already > cancelled ? sun.n > io.ch.SelectionKeyImpl@66ea19c > *java.nio.channels.CancelledKeyException* > at > org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:392) > at > org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:145) > > ==== Driver > > 2014-12-10 19:05:13,805 INFO > [sparkDriver-akka.actor.default-dispatcher-13] storage.BlockManagerInfo > (Logging.scala:logInfo(59)) - Added input > -0-1418238313600 in memory on ec2-EXECUTOR.compute-1.amazonaws.com:39444 > (size: 38.2 KB, free: 4.1 GB) > 2014-12-10 19:05:13,823 ERROR > [sparkDriver-akka.actor.default-dispatcher-16] scheduler.JobScheduler > (Logging.scala:logError(96)) - Error runnin > g job streaming job 1418238300000 ms.0 > *java.io.FileNotFoundException*: File > s3n://BUCKET/_temporary/0/task_201412101900_0039_m_000033 does not exist. > at > org.apache.hadoop.fs.s3native.NativeS3FileSystem.listStatus(NativeS3FileSystem.java:506) > at > org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:360) > at > org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:310) > at > org.apache.hadoop.mapred.FileOutputCommitter.commitJob(FileOutputCommitter.java:136) > at > org.apache.spark.SparkHadoopWriter.commitJob(SparkHadoopWriter.scala:126) > at > org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:995) > at > org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:878) > at > org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:845) > at > org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:803) > at > MyDumperClass$$anonfun$main$1.apply(IncrementalDumpsJenkins.scala:100) > at > MyDumperClass$$anonfun$main$1.apply(IncrementalDumpsJenkins.scala:79) > at > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41) > at > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) > at > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) > at scala.util.Try$.apply(Try.scala:161) > at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32) > at > org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:172) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > 2014-12-10 19:05:13,829 INFO [Driver] yarn.ApplicationMaster > (Logging.scala:logInfo(59)) - Unregistering ApplicationMaster with FAILED > > > *--Flávio R. Santos* > > Chaordic | *Platform* > *www.chaordic.com.br <http://www.chaordic.com.br/>* > +55 48 3232.3200 >