To answer your question specifically - you can bump the value on 
spark.streaming.kafka.maxRetries (see configuration guide: 
http://spark.apache.org/docs/latest/configuration.html).

That being said, you should avoid this by adding some validation in your 
deserializaiton / parse code.

A quick and dirty way to do it is:

        val lines = messages.flatMapValues(v => Try(v.toInt).toOption)


This way, only the lines that are successfully parsed are kept around.
Read a bit on scala.util.{Try, Success, Failure} and Options to understand 
what’s going on.

-adrian



On 10/12/15, 9:05 AM, "Amit Singh Hora" <hora.a...@gmail.com> wrote:

>I am running spark locally to understand how countByValueAndWindow works
>
>
>  
>      val Array(brokers, topics) = Array("192.XX.X.XX:9092", "test1")
>  
>      // Create context with 2 second batch interval
>      val sparkConf = new
>SparkConf().setAppName("ReduceByWindowExample").setMaster("local[1,1]")
>      sparkConf.set("spark.task.maxFailures","1")
>      
>      val ssc = new StreamingContext(sparkConf, Seconds(1)) // batch size 1
>      ssc.checkpoint("D:\\SparkCheckPointDirectory")
>      
>      // Create direct kafka stream with brokers and topics
>      val topicsSet = topics.split(",").toSet
>      val kafkaParams = Map[String, String]("metadata.broker.list" ->
>brokers)
>  
>      val messages = KafkaUtils.createDirectStream[String, String,
>StringDecoder, StringDecoder](
>        ssc, kafkaParams, topicsSet)
>  
>      // Get the lines, split them into words, count the words and print
>      val lines = messages.map(_._2.toInt)
>      val keyValuelines = lines.map { x => (x, 1) }
>  
>      val windowedlines=lines.countByValueAndWindow(Seconds(1),Seconds(1))
>      //window,interval
>      //    val windowedlines = lines.reduceByWindow((x, y) => { x + y },
>Seconds(4) , Seconds(2))
>          windowedlines.print()
>  
>      ssc.start()
>      ssc.awaitTermination()
>
>
>everything works file till numeric data is supplied on the kafka topic as I
>am using toInt ,when a blank string "" is written on kafka topic it fails
>complaining NumberFormatExceotion that is OK,but the problem is it is
>retrying this indefinetly again and again and complaining the same
>NumberFormatException Is there any way to control number of time spark will
>try to convert string to Int ,like Spark should try it only [times] and then
>move to next batch of data
>
>Note - I am using Spark 1.4
>    
>
>
>
>--
>View this message in context: 
>http://apache-spark-user-list.1001560.n3.nabble.com/Spark-retrying-task-indefinietly-tp25022.html
>Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
>---------------------------------------------------------------------
>To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>For additional commands, e-mail: user-h...@spark.apache.org
>

Reply via email to