Try to add the following to the sparkConf

 .set("spark.core.connection.ack.wait.timeout","6000")

      .set("spark.akka.frameSize","60")

Used to face that issue with spark 1.1.0

Thanks
Best Regards

On Thu, Dec 11, 2014 at 2:14 AM, Flávio Santos <bar...@chaordicsystems.com>
wrote:

> Dear Spark'ers,
>
> I'm trying to run a simple job using Spark Streaming (version 1.1.1) and
> YARN (yarn-cluster), but unfortunately I'm facing many issues. In short, my
> job does the following:
> - Consumes a specific Kafka topic
> - Writes its content to S3 or HDFS
>
> Records in Kafka are in the form:
> {"key": "someString"}
>
> This is important because I use the value of "key" to define the output
> file name in S3.
> Here are the Spark and Kafka parameters I'm using:
>
> val sparkConf = new SparkConf()
>>   .setAppName("MyDumperApp")
>>   .set("spark.task.maxFailures", "100")
>>   .set("spark.hadoop.validateOutputSpecs", "false")
>>   .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
>>   .set("spark.executor.extraJavaOptions", "-XX:+UseCompressedOops")
>> val kafkaParams = Map(
>>   "zookeeper.connect" -> zkQuorum,
>>   "zookeeper.session.timeout.ms" -> "10000",
>>   "rebalance.backoff.ms" -> "8000",
>>   "rebalance.max.retries" -> "10",
>>   "group.id" -> group,
>>   "auto.offset.reset" -> "largest"
>> )
>
>
> My application is the following:
>
> KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc,
>> kafkaParams, Map(topic -> 1), StorageLevel.MEMORY_AND_DISK_SER_2)
>>   .foreachRDD((rdd, time) =>
>>     rdd.map {
>>       case (_, line) =>
>>         val json = parse(line)
>>         val key = extract(json, "key").getOrElse("key_not_found")
>>         (key, dateFormatter.format(time.milliseconds)) -> line
>>     }
>>       .partitionBy(new HashPartitioner(10))
>>       .saveAsHadoopFile[KeyBasedOutput[(String,String),
>> String]]("s3://BUCKET", classOf[BZip2Codec])
>>   )
>
>
> And the last piece:
>
> class KeyBasedOutput[T >: Null, V <: AnyRef] extends
>> MultipleTextOutputFormat[T , V] {
>>   override protected def generateFileNameForKeyValue(key: T, value: V,
>> leaf: String) = key match {
>>     case (myKey, batchId) =>
>>       "somedir" + "/" + myKey + "/" +
>>         "prefix-" + myKey + "_" + batchId + "_" + leaf
>>   }
>>   override protected def generateActualKey(key: T, value: V) = null
>> }
>
>
> I use batch sizes of 5 minutes with checkpoints activated.
> The job fails nondeterministically (I think it never ran longer than ~5
> hours). I have no clue why, it simply fails.
> Please find below the exceptions thrown by my application.
>
> I really appreciate any kind of hint.
> Thank you very much in advance.
>
> Regards,
> -- Flávio
>
> ==== Executor 1
>
> 2014-12-10 19:05:15,150 INFO  [handle-read-write-executor-3]
> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
> SendingConnection
>  to ConnectionManagerId(ec2-DRIVER.compute-1.amazonaws.com,49163)
> 2014-12-10 19:05:15,201 INFO  [Thread-6] storage.MemoryStore
> (Logging.scala:logInfo(59)) - ensureFreeSpace(98665) called with
> curMem=194463488,
>  maxMem=4445479895
> 2014-12-10 19:05:15,202 INFO  [Thread-6] storage.MemoryStore
> (Logging.scala:logInfo(59)) - Block input-0-1418238315000 stored as bytes
> in memor
> y (estimated size 96.4 KB, free 4.0 GB)
> 2014-12-10 19:05:16,506 INFO  [handle-read-write-executor-2]
> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
> ReceivingConnecti
> on to ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,51443)
> 2014-12-10 19:05:16,506 INFO  [handle-read-write-executor-1]
> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
> SendingConnection
>  to ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,51443)
> 2014-12-10 19:05:16,506 INFO  [handle-read-write-executor-2]
> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
> SendingConnection
>  to ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,51443)
> 2014-12-10 19:05:16,649 INFO  [handle-read-write-executor-0]
> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
> ReceivingConnecti
> on to ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,39644)
> 2014-12-10 19:05:16,649 INFO  [handle-read-write-executor-3]
> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
> SendingConnection
>  to ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,39644)
> 2014-12-10 19:05:16,649 INFO  [handle-read-write-executor-0]
> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
> SendingConnection
>  to ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,39644)
> 2014-12-10 19:05:16,650 INFO  [connection-manager-thread]
> network.ConnectionManager (Logging.scala:logInfo(59)) - Key not valid ?
> sun.nio.ch.Se
> lectionKeyImpl@da2e041
> 2014-12-10 19:05:16,651 INFO  [connection-manager-thread]
> network.ConnectionManager (Logging.scala:logInfo(80)) - key already
> cancelled ? sun.n
> io.ch.SelectionKeyImpl@da2e041
> *java.nio.channels.CancelledKeyException*
>         at
> org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:316)
>         at
> org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:145)
> 2014-12-10 19:05:16,679 INFO  [handle-read-write-executor-1]
> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
> ReceivingConnection to ConnectionManagerId(
> ec2-EXECUTOR.compute-1.amazonaws.com,39444)
> 2014-12-10 19:05:16,679 INFO  [handle-read-write-executor-2]
> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
> SendingConnection to ConnectionManagerId(
> ec2-EXECUTOR.compute-1.amazonaws.com,39444)
> 2014-12-10 19:05:16,679 INFO  [handle-read-write-executor-1]
> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
> SendingConnection to ConnectionManagerId(
> ec2-EXECUTOR.compute-1.amazonaws.com,39444)
> 2014-12-10 19:05:16,680 INFO  [connection-manager-thread]
> network.ConnectionManager (Logging.scala:logInfo(59)) - Key not valid ?
> sun.nio.ch.SelectionKeyImpl@6a0dd98a
> 2014-12-10 19:05:16,681 INFO  [connection-manager-thread]
> network.ConnectionManager (Logging.scala:logInfo(80)) - key already
> cancelled ? sun.nio.ch.SelectionKeyImpl@6a0dd98a
> *java.nio.channels.CancelledKeyException*
>         at
> org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:392)
>         at
> org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:145)
> 2014-12-10 19:05:16,717 INFO  [handle-read-write-executor-3]
> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
> ReceivingConnection to ConnectionManagerId(
> ec2-EXECUTOR.compute-1.amazonaws.com,57984)
> 2014-12-10 19:05:16,717 INFO  [handle-read-write-executor-0]
> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
> SendingConnection to ConnectionManagerId(
> ec2-EXECUTOR.compute-1.amazonaws.com,57984)
> 2014-12-10 19:05:16,717 INFO  [handle-read-write-executor-3]
> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
> SendingConnection to ConnectionManagerId(
> ec2-EXECUTOR.compute-1.amazonaws.com,57984)
> 2014-12-10 19:05:17,182 ERROR [SIGTERM handler]
> executor.CoarseGrainedExecutorBackend (SignalLogger.scala:handle(57)) -
> RECEIVED SIGNAL 15: SIGTERM
>
> ==== Executor 2
>
> 2014-12-10 19:05:15,010 INFO  [handle-message-executor-11]
> storage.BlockManagerMaster (Logging.scala:logInfo(59)) - Updated info of
> block input
> -0-1418238314800
> 2014-12-10 19:05:15,157 INFO  [connection-manager-thread]
> network.ConnectionManager (Logging.scala:logInfo(59)) - Key not valid ?
> sun.nio.ch.Se
> lectionKeyImpl@66ea19c
> 2014-12-10 19:05:15,157 INFO  [handle-read-write-executor-2]
> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
> SendingConnection
>  to ConnectionManagerId(ec2-DRIVER.compute-1.amazonaws.com,49163)
> 2014-12-10 19:05:15,157 INFO  [handle-read-write-executor-0]
> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
> ReceivingConnecti
> on to ConnectionManagerId(ec2-DRIVER.compute-1.amazonaws.com,49163)
> 2014-12-10 19:05:15,158 ERROR [handle-read-write-executor-0]
> network.ConnectionManager (Logging.scala:logError(75)) - Corresponding
> SendingConn
> ection to ConnectionManagerId(ec2-DRIVER.compute-1.amazonaws.com,49163)
> not found
> 2014-12-10 19:05:15,158 INFO  [connection-manager-thread]
> network.ConnectionManager (Logging.scala:logInfo(80)) - key already
> cancelled ? sun.n
> io.ch.SelectionKeyImpl@66ea19c
> *java.nio.channels.CancelledKeyException*
>         at
> org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:392)
>         at
> org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:145)
>
> ==== Driver
>
> 2014-12-10 19:05:13,805 INFO
>  [sparkDriver-akka.actor.default-dispatcher-13] storage.BlockManagerInfo
> (Logging.scala:logInfo(59)) - Added input
> -0-1418238313600 in memory on ec2-EXECUTOR.compute-1.amazonaws.com:39444
> (size: 38.2 KB, free: 4.1 GB)
> 2014-12-10 19:05:13,823 ERROR
> [sparkDriver-akka.actor.default-dispatcher-16] scheduler.JobScheduler
> (Logging.scala:logError(96)) - Error runnin
> g job streaming job 1418238300000 ms.0
> *java.io.FileNotFoundException*: File
> s3n://BUCKET/_temporary/0/task_201412101900_0039_m_000033 does not exist.
>         at
> org.apache.hadoop.fs.s3native.NativeS3FileSystem.listStatus(NativeS3FileSystem.java:506)
>         at
> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:360)
>         at
> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:310)
>         at
> org.apache.hadoop.mapred.FileOutputCommitter.commitJob(FileOutputCommitter.java:136)
>         at
> org.apache.spark.SparkHadoopWriter.commitJob(SparkHadoopWriter.scala:126)
>         at
> org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:995)
>         at
> org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:878)
>         at
> org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:845)
>         at
> org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:803)
>         at
> MyDumperClass$$anonfun$main$1.apply(IncrementalDumpsJenkins.scala:100)
>         at
> MyDumperClass$$anonfun$main$1.apply(IncrementalDumpsJenkins.scala:79)
>         at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41)
>         at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
>         at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
>         at scala.util.Try$.apply(Try.scala:161)
>         at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
>         at
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:172)
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>         at java.lang.Thread.run(Thread.java:745)
> 2014-12-10 19:05:13,829 INFO  [Driver] yarn.ApplicationMaster
> (Logging.scala:logInfo(59)) - Unregistering ApplicationMaster with FAILED
>
>
> *--Flávio R. Santos*
>
> Chaordic | *Platform*
> *www.chaordic.com.br <http://www.chaordic.com.br/>*
> +55 48 3232.3200
>

Reply via email to