Spark retrying task indefinietly

2015-10-11 Thread Amit Singh Hora
I am running spark locally to understand how countByValueAndWindow works


  
  val Array(brokers, topics) = Array("192.XX.X.XX:9092", "test1")
  
  // Create context with 2 second batch interval
  val sparkConf = new
SparkConf().setAppName("ReduceByWindowExample").setMaster("local[1,1]")
  sparkConf.set("spark.task.maxFailures","1")
  
  val ssc = new StreamingContext(sparkConf, Seconds(1)) // batch size 1
  ssc.checkpoint("D:\\SparkCheckPointDirectory")
  
  // Create direct kafka stream with brokers and topics
  val topicsSet = topics.split(",").toSet
  val kafkaParams = Map[String, String]("metadata.broker.list" ->
brokers)
  
  val messages = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](
ssc, kafkaParams, topicsSet)
  
  // Get the lines, split them into words, count the words and print
  val lines = messages.map(_._2.toInt)
  val keyValuelines = lines.map { x => (x, 1) }
  
  val windowedlines=lines.countByValueAndWindow(Seconds(1),Seconds(1))
  //window,interval
  //val windowedlines = lines.reduceByWindow((x, y) => { x + y },
Seconds(4) , Seconds(2))
  windowedlines.print()
  
  ssc.start()
  ssc.awaitTermination()


everything works file till numeric data is supplied on the kafka topic as I
am using toInt ,when a blank string "" is written on kafka topic it fails
complaining NumberFormatExceotion that is OK,but the problem is it is
retrying this indefinetly again and again and complaining the same
NumberFormatException Is there any way to control number of time spark will
try to convert string to Int ,like Spark should try it only [times] and then
move to next batch of data

Note - I am using Spark 1.4




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-retrying-task-indefinietly-tp25022.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark retrying task indefinietly

2015-10-12 Thread Adrian Tanase
To answer your question specifically - you can bump the value on 
spark.streaming.kafka.maxRetries (see configuration guide: 
http://spark.apache.org/docs/latest/configuration.html).

That being said, you should avoid this by adding some validation in your 
deserializaiton / parse code.

A quick and dirty way to do it is:

val lines = messages.flatMapValues(v => Try(v.toInt).toOption)


This way, only the lines that are successfully parsed are kept around.
Read a bit on scala.util.{Try, Success, Failure} and Options to understand 
what’s going on.

-adrian



On 10/12/15, 9:05 AM, "Amit Singh Hora"  wrote:

>I am running spark locally to understand how countByValueAndWindow works
>
>
>  
>  val Array(brokers, topics) = Array("192.XX.X.XX:9092", "test1")
>  
>  // Create context with 2 second batch interval
>  val sparkConf = new
>SparkConf().setAppName("ReduceByWindowExample").setMaster("local[1,1]")
>  sparkConf.set("spark.task.maxFailures","1")
>  
>  val ssc = new StreamingContext(sparkConf, Seconds(1)) // batch size 1
>  ssc.checkpoint("D:\\SparkCheckPointDirectory")
>  
>  // Create direct kafka stream with brokers and topics
>  val topicsSet = topics.split(",").toSet
>  val kafkaParams = Map[String, String]("metadata.broker.list" ->
>brokers)
>  
>  val messages = KafkaUtils.createDirectStream[String, String,
>StringDecoder, StringDecoder](
>ssc, kafkaParams, topicsSet)
>  
>  // Get the lines, split them into words, count the words and print
>  val lines = messages.map(_._2.toInt)
>  val keyValuelines = lines.map { x => (x, 1) }
>  
>  val windowedlines=lines.countByValueAndWindow(Seconds(1),Seconds(1))
>  //window,interval
>  //val windowedlines = lines.reduceByWindow((x, y) => { x + y },
>Seconds(4) , Seconds(2))
>  windowedlines.print()
>  
>  ssc.start()
>  ssc.awaitTermination()
>
>
>everything works file till numeric data is supplied on the kafka topic as I
>am using toInt ,when a blank string "" is written on kafka topic it fails
>complaining NumberFormatExceotion that is OK,but the problem is it is
>retrying this indefinetly again and again and complaining the same
>NumberFormatException Is there any way to control number of time spark will
>try to convert string to Int ,like Spark should try it only [times] and then
>move to next batch of data
>
>Note - I am using Spark 1.4
>
>
>
>
>--
>View this message in context: 
>http://apache-spark-user-list.1001560.n3.nabble.com/Spark-retrying-task-indefinietly-tp25022.html
>Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
>-
>To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>For additional commands, e-mail: user-h...@spark.apache.org
>