Re: spark streaming 1.3 kafka topic error

2015-08-31 Thread Cody Koeninger
You can't set it to less than 1

Just set it to max int if that's really what you want to do

On Mon, Aug 31, 2015 at 6:00 AM, Shushant Arora 
wrote:

> Say if my cluster takes long time for rebalance for some reason
> intermittently . So to handle that Can I have infinite retries instead of
> killing the app? What should be the value of retries (-1) will work or
> something else ?
>
> On Thu, Aug 27, 2015 at 6:46 PM, Cody Koeninger 
> wrote:
>
>> Your kafka broker died or you otherwise had a rebalance.
>>
>> Normally spark retries take care of that.
>>
>> Is there something going on with your kafka installation, that rebalance
>> is taking especially long?
>>
>> Yes, increasing backoff / max number of retries will "help", but it's
>> better to figure out what's going on with kafka.
>>
>> On Wed, Aug 26, 2015 at 9:07 PM, Shushant Arora <
>> shushantaror...@gmail.com> wrote:
>>
>>> Hi
>>>
>>> My streaming application gets killed with below error
>>>
>>> 5/08/26 21:55:20 ERROR kafka.DirectKafkaInputDStream:
>>> ArrayBuffer(kafka.common.NotLeaderForPartitionException,
>>> kafka.common.NotLeaderForPartitionException,
>>> kafka.common.NotLeaderForPartitionException,
>>> kafka.common.NotLeaderForPartitionException,
>>> kafka.common.NotLeaderForPartitionException,
>>> org.apache.spark.SparkException: Couldn't find leader offsets for
>>> Set([testtopic,223], [testtopic,205], [testtopic,64], [testtopic,100],
>>> [testtopic,193]))
>>> 15/08/26 21:55:20 ERROR scheduler.JobScheduler: Error generating jobs
>>> for time 144062612 ms
>>> org.apache.spark.SparkException:
>>> ArrayBuffer(kafka.common.NotLeaderForPartitionException,
>>> org.apache.spark.SparkException: Couldn't find leader offsets for
>>> Set([testtopic,115]))
>>> at
>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:94)
>>> at
>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:116)
>>> at
>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
>>> at
>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
>>> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>>> at
>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
>>> at
>>>
>>>
>>>
>>> Kafka params in job logs printed are :
>>>  value.serializer = class
>>> org.apache.kafka.common.serialization.StringSerializer
>>> key.serializer = class
>>> org.apache.kafka.common.serialization.StringSerializer
>>> block.on.buffer.full = true
>>> retry.backoff.ms = 100
>>> buffer.memory = 1048576
>>> batch.size = 16384
>>> metrics.sample.window.ms = 3
>>> metadata.max.age.ms = 30
>>> receive.buffer.bytes = 32768
>>> timeout.ms = 3
>>> max.in.flight.requests.per.connection = 5
>>> bootstrap.servers = [broker1:9092, broker2:9092, broker3:9092]
>>> metric.reporters = []
>>> client.id =
>>> compression.type = none
>>> retries = 0
>>> max.request.size = 1048576
>>> send.buffer.bytes = 131072
>>> acks = all
>>> reconnect.backoff.ms = 10
>>> linger.ms = 0
>>> metrics.num.samples = 2
>>> metadata.fetch.timeout.ms = 6
>>>
>>>
>>> Is it kafka broker getting down and job is getting killed ? Whats the
>>> best way to handle it ?
>>> Increasing retries and backoff time  wil help and to what values those
>>> should be set to never have streaming application failure - rather it keep
>>> on retrying after few seconds and send a event so that my custom code can
>>> send notification of kafka broker down if its because of that.
>>>
>>>
>>> Thanks
>>>
>>>
>>
>


Re: spark streaming 1.3 kafka topic error

2015-08-31 Thread Shushant Arora
Say if my cluster takes long time for rebalance for some reason
intermittently . So to handle that Can I have infinite retries instead of
killing the app? What should be the value of retries (-1) will work or
something else ?

On Thu, Aug 27, 2015 at 6:46 PM, Cody Koeninger  wrote:

> Your kafka broker died or you otherwise had a rebalance.
>
> Normally spark retries take care of that.
>
> Is there something going on with your kafka installation, that rebalance
> is taking especially long?
>
> Yes, increasing backoff / max number of retries will "help", but it's
> better to figure out what's going on with kafka.
>
> On Wed, Aug 26, 2015 at 9:07 PM, Shushant Arora  > wrote:
>
>> Hi
>>
>> My streaming application gets killed with below error
>>
>> 5/08/26 21:55:20 ERROR kafka.DirectKafkaInputDStream:
>> ArrayBuffer(kafka.common.NotLeaderForPartitionException,
>> kafka.common.NotLeaderForPartitionException,
>> kafka.common.NotLeaderForPartitionException,
>> kafka.common.NotLeaderForPartitionException,
>> kafka.common.NotLeaderForPartitionException,
>> org.apache.spark.SparkException: Couldn't find leader offsets for
>> Set([testtopic,223], [testtopic,205], [testtopic,64], [testtopic,100],
>> [testtopic,193]))
>> 15/08/26 21:55:20 ERROR scheduler.JobScheduler: Error generating jobs for
>> time 144062612 ms
>> org.apache.spark.SparkException:
>> ArrayBuffer(kafka.common.NotLeaderForPartitionException,
>> org.apache.spark.SparkException: Couldn't find leader offsets for
>> Set([testtopic,115]))
>> at
>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:94)
>> at
>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:116)
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
>> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
>> at
>>
>>
>>
>> Kafka params in job logs printed are :
>>  value.serializer = class
>> org.apache.kafka.common.serialization.StringSerializer
>> key.serializer = class
>> org.apache.kafka.common.serialization.StringSerializer
>> block.on.buffer.full = true
>> retry.backoff.ms = 100
>> buffer.memory = 1048576
>> batch.size = 16384
>> metrics.sample.window.ms = 3
>> metadata.max.age.ms = 30
>> receive.buffer.bytes = 32768
>> timeout.ms = 3
>> max.in.flight.requests.per.connection = 5
>> bootstrap.servers = [broker1:9092, broker2:9092, broker3:9092]
>> metric.reporters = []
>> client.id =
>> compression.type = none
>> retries = 0
>> max.request.size = 1048576
>> send.buffer.bytes = 131072
>> acks = all
>> reconnect.backoff.ms = 10
>> linger.ms = 0
>> metrics.num.samples = 2
>> metadata.fetch.timeout.ms = 6
>>
>>
>> Is it kafka broker getting down and job is getting killed ? Whats the
>> best way to handle it ?
>> Increasing retries and backoff time  wil help and to what values those
>> should be set to never have streaming application failure - rather it keep
>> on retrying after few seconds and send a event so that my custom code can
>> send notification of kafka broker down if its because of that.
>>
>>
>> Thanks
>>
>>
>


Re: spark streaming 1.3 kafka topic error

2015-08-27 Thread Ahmed Nawar
Dears,

I needs to commit DB Transaction for each partition,Not for each row.
below didn't work for me.


rdd.mapPartitions(partitionOfRecords = {

DBConnectionInit()

val results = partitionOfRecords.map(..)

DBConnection.commit()


})



Best regards,

Ahmed Atef Nawwar

Data Management  Big Data Consultant






On Thu, Aug 27, 2015 at 4:16 PM, Cody Koeninger c...@koeninger.org wrote:

 Your kafka broker died or you otherwise had a rebalance.

 Normally spark retries take care of that.

 Is there something going on with your kafka installation, that rebalance
 is taking especially long?

 Yes, increasing backoff / max number of retries will help, but it's
 better to figure out what's going on with kafka.

 On Wed, Aug 26, 2015 at 9:07 PM, Shushant Arora shushantaror...@gmail.com
  wrote:

 Hi

 My streaming application gets killed with below error

 5/08/26 21:55:20 ERROR kafka.DirectKafkaInputDStream:
 ArrayBuffer(kafka.common.NotLeaderForPartitionException,
 kafka.common.NotLeaderForPartitionException,
 kafka.common.NotLeaderForPartitionException,
 kafka.common.NotLeaderForPartitionException,
 kafka.common.NotLeaderForPartitionException,
 org.apache.spark.SparkException: Couldn't find leader offsets for
 Set([testtopic,223], [testtopic,205], [testtopic,64], [testtopic,100],
 [testtopic,193]))
 15/08/26 21:55:20 ERROR scheduler.JobScheduler: Error generating jobs for
 time 144062612 ms
 org.apache.spark.SparkException:
 ArrayBuffer(kafka.common.NotLeaderForPartitionException,
 org.apache.spark.SparkException: Couldn't find leader offsets for
 Set([testtopic,115]))
 at
 org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:94)
 at
 org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:116)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
 at



 Kafka params in job logs printed are :
  value.serializer = class
 org.apache.kafka.common.serialization.StringSerializer
 key.serializer = class
 org.apache.kafka.common.serialization.StringSerializer
 block.on.buffer.full = true
 retry.backoff.ms = 100
 buffer.memory = 1048576
 batch.size = 16384
 metrics.sample.window.ms = 3
 metadata.max.age.ms = 30
 receive.buffer.bytes = 32768
 timeout.ms = 3
 max.in.flight.requests.per.connection = 5
 bootstrap.servers = [broker1:9092, broker2:9092, broker3:9092]
 metric.reporters = []
 client.id =
 compression.type = none
 retries = 0
 max.request.size = 1048576
 send.buffer.bytes = 131072
 acks = all
 reconnect.backoff.ms = 10
 linger.ms = 0
 metrics.num.samples = 2
 metadata.fetch.timeout.ms = 6


 Is it kafka broker getting down and job is getting killed ? Whats the
 best way to handle it ?
 Increasing retries and backoff time  wil help and to what values those
 should be set to never have streaming application failure - rather it keep
 on retrying after few seconds and send a event so that my custom code can
 send notification of kafka broker down if its because of that.


 Thanks





Re: spark streaming 1.3 kafka topic error

2015-08-27 Thread Cody Koeninger
Map is lazy.  You need an actual action, or nothing will happen.  Use
foreachPartition, or do an empty foreach after the map.

On Thu, Aug 27, 2015 at 8:53 AM, Ahmed Nawar ahmed.na...@gmail.com wrote:

 Dears,

 I needs to commit DB Transaction for each partition,Not for each row.
 below didn't work for me.


 rdd.mapPartitions(partitionOfRecords = {

 DBConnectionInit()

 val results = partitionOfRecords.map(..)

 DBConnection.commit()


 })



 Best regards,

 Ahmed Atef Nawwar

 Data Management  Big Data Consultant






 On Thu, Aug 27, 2015 at 4:16 PM, Cody Koeninger c...@koeninger.org
 wrote:

 Your kafka broker died or you otherwise had a rebalance.

 Normally spark retries take care of that.

 Is there something going on with your kafka installation, that rebalance
 is taking especially long?

 Yes, increasing backoff / max number of retries will help, but it's
 better to figure out what's going on with kafka.

 On Wed, Aug 26, 2015 at 9:07 PM, Shushant Arora 
 shushantaror...@gmail.com wrote:

 Hi

 My streaming application gets killed with below error

 5/08/26 21:55:20 ERROR kafka.DirectKafkaInputDStream:
 ArrayBuffer(kafka.common.NotLeaderForPartitionException,
 kafka.common.NotLeaderForPartitionException,
 kafka.common.NotLeaderForPartitionException,
 kafka.common.NotLeaderForPartitionException,
 kafka.common.NotLeaderForPartitionException,
 org.apache.spark.SparkException: Couldn't find leader offsets for
 Set([testtopic,223], [testtopic,205], [testtopic,64], [testtopic,100],
 [testtopic,193]))
 15/08/26 21:55:20 ERROR scheduler.JobScheduler: Error generating jobs
 for time 144062612 ms
 org.apache.spark.SparkException:
 ArrayBuffer(kafka.common.NotLeaderForPartitionException,
 org.apache.spark.SparkException: Couldn't find leader offsets for
 Set([testtopic,115]))
 at
 org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:94)
 at
 org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:116)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
 at



 Kafka params in job logs printed are :
  value.serializer = class
 org.apache.kafka.common.serialization.StringSerializer
 key.serializer = class
 org.apache.kafka.common.serialization.StringSerializer
 block.on.buffer.full = true
 retry.backoff.ms = 100
 buffer.memory = 1048576
 batch.size = 16384
 metrics.sample.window.ms = 3
 metadata.max.age.ms = 30
 receive.buffer.bytes = 32768
 timeout.ms = 3
 max.in.flight.requests.per.connection = 5
 bootstrap.servers = [broker1:9092, broker2:9092, broker3:9092]
 metric.reporters = []
 client.id =
 compression.type = none
 retries = 0
 max.request.size = 1048576
 send.buffer.bytes = 131072
 acks = all
 reconnect.backoff.ms = 10
 linger.ms = 0
 metrics.num.samples = 2
 metadata.fetch.timeout.ms = 6


 Is it kafka broker getting down and job is getting killed ? Whats the
 best way to handle it ?
 Increasing retries and backoff time  wil help and to what values those
 should be set to never have streaming application failure - rather it keep
 on retrying after few seconds and send a event so that my custom code can
 send notification of kafka broker down if its because of that.


 Thanks