Your kafka broker died or you otherwise had a rebalance.

Normally spark retries take care of that.

Is there something going on with your kafka installation, that rebalance is
taking especially long?

Yes, increasing backoff / max number of retries will "help", but it's
better to figure out what's going on with kafka.

On Wed, Aug 26, 2015 at 9:07 PM, Shushant Arora <shushantaror...@gmail.com>
wrote:

> Hi
>
> My streaming application gets killed with below error
>
> 5/08/26 21:55:20 ERROR kafka.DirectKafkaInputDStream:
> ArrayBuffer(kafka.common.NotLeaderForPartitionException,
> kafka.common.NotLeaderForPartitionException,
> kafka.common.NotLeaderForPartitionException,
> kafka.common.NotLeaderForPartitionException,
> kafka.common.NotLeaderForPartitionException,
> org.apache.spark.SparkException: Couldn't find leader offsets for
> Set([testtopic,223], [testtopic,205], [testtopic,64], [testtopic,100],
> [testtopic,193]))
> 15/08/26 21:55:20 ERROR scheduler.JobScheduler: Error generating jobs for
> time 1440626120000 ms
> org.apache.spark.SparkException:
> ArrayBuffer(kafka.common.NotLeaderForPartitionException,
> org.apache.spark.SparkException: Couldn't find leader offsets for
> Set([testtopic,115]))
> at
> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:94)
> at
> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:116)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
> at
>
>
>
> Kafka params in job logs printed are :
>  value.serializer = class
> org.apache.kafka.common.serialization.StringSerializer
>         key.serializer = class
> org.apache.kafka.common.serialization.StringSerializer
>         block.on.buffer.full = true
>         retry.backoff.ms = 100
>         buffer.memory = 1048576
>         batch.size = 16384
>         metrics.sample.window.ms = 30000
>         metadata.max.age.ms = 300000
>         receive.buffer.bytes = 32768
>         timeout.ms = 30000
>         max.in.flight.requests.per.connection = 5
>         bootstrap.servers = [broker1:9092, broker2:9092, broker3:9092]
>         metric.reporters = []
>         client.id =
>         compression.type = none
>         retries = 0
>         max.request.size = 1048576
>         send.buffer.bytes = 131072
>         acks = all
>         reconnect.backoff.ms = 10
>         linger.ms = 0
>         metrics.num.samples = 2
>         metadata.fetch.timeout.ms = 60000
>
>
> Is it kafka broker getting down and job is getting killed ? Whats the best
> way to handle it ?
> Increasing retries and backoff time  wil help and to what values those
> should be set to never have streaming application failure - rather it keep
> on retrying after few seconds and send a event so that my custom code can
> send notification of kafka broker down if its because of that.
>
>
> Thanks
>
>

Reply via email to