Your kafka broker died or you otherwise had a rebalance. Normally spark retries take care of that.
Is there something going on with your kafka installation, that rebalance is taking especially long? Yes, increasing backoff / max number of retries will "help", but it's better to figure out what's going on with kafka. On Wed, Aug 26, 2015 at 9:07 PM, Shushant Arora <shushantaror...@gmail.com> wrote: > Hi > > My streaming application gets killed with below error > > 5/08/26 21:55:20 ERROR kafka.DirectKafkaInputDStream: > ArrayBuffer(kafka.common.NotLeaderForPartitionException, > kafka.common.NotLeaderForPartitionException, > kafka.common.NotLeaderForPartitionException, > kafka.common.NotLeaderForPartitionException, > kafka.common.NotLeaderForPartitionException, > org.apache.spark.SparkException: Couldn't find leader offsets for > Set([testtopic,223], [testtopic,205], [testtopic,64], [testtopic,100], > [testtopic,193])) > 15/08/26 21:55:20 ERROR scheduler.JobScheduler: Error generating jobs for > time 1440626120000 ms > org.apache.spark.SparkException: > ArrayBuffer(kafka.common.NotLeaderForPartitionException, > org.apache.spark.SparkException: Couldn't find leader offsets for > Set([testtopic,115])) > at > org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:94) > at > org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:116) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299) > at > > > > Kafka params in job logs printed are : > value.serializer = class > org.apache.kafka.common.serialization.StringSerializer > key.serializer = class > org.apache.kafka.common.serialization.StringSerializer > block.on.buffer.full = true > retry.backoff.ms = 100 > buffer.memory = 1048576 > batch.size = 16384 > metrics.sample.window.ms = 30000 > metadata.max.age.ms = 300000 > receive.buffer.bytes = 32768 > timeout.ms = 30000 > max.in.flight.requests.per.connection = 5 > bootstrap.servers = [broker1:9092, broker2:9092, broker3:9092] > metric.reporters = [] > client.id = > compression.type = none > retries = 0 > max.request.size = 1048576 > send.buffer.bytes = 131072 > acks = all > reconnect.backoff.ms = 10 > linger.ms = 0 > metrics.num.samples = 2 > metadata.fetch.timeout.ms = 60000 > > > Is it kafka broker getting down and job is getting killed ? Whats the best > way to handle it ? > Increasing retries and backoff time wil help and to what values those > should be set to never have streaming application failure - rather it keep > on retrying after few seconds and send a event so that my custom code can > send notification of kafka broker down if its because of that. > > > Thanks > >