I am running spark locally to understand how countByValueAndWindow works

      val Array(brokers, topics) = Array("192.XX.X.XX:9092", "test1")
      // Create context with 2 second batch interval
      val sparkConf = new
      val ssc = new StreamingContext(sparkConf, Seconds(1)) // batch size 1
      // Create direct kafka stream with brokers and topics
      val topicsSet = topics.split(",").toSet
      val kafkaParams = Map[String, String]("metadata.broker.list" ->
      val messages = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](
        ssc, kafkaParams, topicsSet)
      // Get the lines, split them into words, count the words and print
      val lines = messages.map(_._2.toInt)
      val keyValuelines = lines.map { x => (x, 1) }
      val windowedlines=lines.countByValueAndWindow(Seconds(1),Seconds(1))
      //    val windowedlines = lines.reduceByWindow((x, y) => { x + y },
Seconds(4) , Seconds(2))

everything works file till numeric data is supplied on the kafka topic as I
am using toInt ,when a blank string "" is written on kafka topic it fails
complaining NumberFormatExceotion that is OK,but the problem is it is
retrying this indefinetly again and again and complaining the same
NumberFormatException Is there any way to control number of time spark will
try to convert string to Int ,like Spark should try it only [times] and then
move to next batch of data

Note - I am using Spark 1.4

