Re: spark streaming 1.3 kafka topic error
You can't set it to less than 1 Just set it to max int if that's really what you want to do On Mon, Aug 31, 2015 at 6:00 AM, Shushant Arorawrote: > Say if my cluster takes long time for rebalance for some reason > intermittently . So to handle that Can I have infinite retries instead of > killing the app? What should be the value of retries (-1) will work or > something else ? > > On Thu, Aug 27, 2015 at 6:46 PM, Cody Koeninger > wrote: > >> Your kafka broker died or you otherwise had a rebalance. >> >> Normally spark retries take care of that. >> >> Is there something going on with your kafka installation, that rebalance >> is taking especially long? >> >> Yes, increasing backoff / max number of retries will "help", but it's >> better to figure out what's going on with kafka. >> >> On Wed, Aug 26, 2015 at 9:07 PM, Shushant Arora < >> shushantaror...@gmail.com> wrote: >> >>> Hi >>> >>> My streaming application gets killed with below error >>> >>> 5/08/26 21:55:20 ERROR kafka.DirectKafkaInputDStream: >>> ArrayBuffer(kafka.common.NotLeaderForPartitionException, >>> kafka.common.NotLeaderForPartitionException, >>> kafka.common.NotLeaderForPartitionException, >>> kafka.common.NotLeaderForPartitionException, >>> kafka.common.NotLeaderForPartitionException, >>> org.apache.spark.SparkException: Couldn't find leader offsets for >>> Set([testtopic,223], [testtopic,205], [testtopic,64], [testtopic,100], >>> [testtopic,193])) >>> 15/08/26 21:55:20 ERROR scheduler.JobScheduler: Error generating jobs >>> for time 144062612 ms >>> org.apache.spark.SparkException: >>> ArrayBuffer(kafka.common.NotLeaderForPartitionException, >>> org.apache.spark.SparkException: Couldn't find leader offsets for >>> Set([testtopic,115])) >>> at >>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:94) >>> at >>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:116) >>> at >>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) >>> at >>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) >>> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) >>> at >>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299) >>> at >>> >>> >>> >>> Kafka params in job logs printed are : >>> value.serializer = class >>> org.apache.kafka.common.serialization.StringSerializer >>> key.serializer = class >>> org.apache.kafka.common.serialization.StringSerializer >>> block.on.buffer.full = true >>> retry.backoff.ms = 100 >>> buffer.memory = 1048576 >>> batch.size = 16384 >>> metrics.sample.window.ms = 3 >>> metadata.max.age.ms = 30 >>> receive.buffer.bytes = 32768 >>> timeout.ms = 3 >>> max.in.flight.requests.per.connection = 5 >>> bootstrap.servers = [broker1:9092, broker2:9092, broker3:9092] >>> metric.reporters = [] >>> client.id = >>> compression.type = none >>> retries = 0 >>> max.request.size = 1048576 >>> send.buffer.bytes = 131072 >>> acks = all >>> reconnect.backoff.ms = 10 >>> linger.ms = 0 >>> metrics.num.samples = 2 >>> metadata.fetch.timeout.ms = 6 >>> >>> >>> Is it kafka broker getting down and job is getting killed ? Whats the >>> best way to handle it ? >>> Increasing retries and backoff time wil help and to what values those >>> should be set to never have streaming application failure - rather it keep >>> on retrying after few seconds and send a event so that my custom code can >>> send notification of kafka broker down if its because of that. >>> >>> >>> Thanks >>> >>> >> >
Re: spark streaming 1.3 kafka topic error
Say if my cluster takes long time for rebalance for some reason intermittently . So to handle that Can I have infinite retries instead of killing the app? What should be the value of retries (-1) will work or something else ? On Thu, Aug 27, 2015 at 6:46 PM, Cody Koeningerwrote: > Your kafka broker died or you otherwise had a rebalance. > > Normally spark retries take care of that. > > Is there something going on with your kafka installation, that rebalance > is taking especially long? > > Yes, increasing backoff / max number of retries will "help", but it's > better to figure out what's going on with kafka. > > On Wed, Aug 26, 2015 at 9:07 PM, Shushant Arora > wrote: > >> Hi >> >> My streaming application gets killed with below error >> >> 5/08/26 21:55:20 ERROR kafka.DirectKafkaInputDStream: >> ArrayBuffer(kafka.common.NotLeaderForPartitionException, >> kafka.common.NotLeaderForPartitionException, >> kafka.common.NotLeaderForPartitionException, >> kafka.common.NotLeaderForPartitionException, >> kafka.common.NotLeaderForPartitionException, >> org.apache.spark.SparkException: Couldn't find leader offsets for >> Set([testtopic,223], [testtopic,205], [testtopic,64], [testtopic,100], >> [testtopic,193])) >> 15/08/26 21:55:20 ERROR scheduler.JobScheduler: Error generating jobs for >> time 144062612 ms >> org.apache.spark.SparkException: >> ArrayBuffer(kafka.common.NotLeaderForPartitionException, >> org.apache.spark.SparkException: Couldn't find leader offsets for >> Set([testtopic,115])) >> at >> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:94) >> at >> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:116) >> at >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) >> at >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) >> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) >> at >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299) >> at >> >> >> >> Kafka params in job logs printed are : >> value.serializer = class >> org.apache.kafka.common.serialization.StringSerializer >> key.serializer = class >> org.apache.kafka.common.serialization.StringSerializer >> block.on.buffer.full = true >> retry.backoff.ms = 100 >> buffer.memory = 1048576 >> batch.size = 16384 >> metrics.sample.window.ms = 3 >> metadata.max.age.ms = 30 >> receive.buffer.bytes = 32768 >> timeout.ms = 3 >> max.in.flight.requests.per.connection = 5 >> bootstrap.servers = [broker1:9092, broker2:9092, broker3:9092] >> metric.reporters = [] >> client.id = >> compression.type = none >> retries = 0 >> max.request.size = 1048576 >> send.buffer.bytes = 131072 >> acks = all >> reconnect.backoff.ms = 10 >> linger.ms = 0 >> metrics.num.samples = 2 >> metadata.fetch.timeout.ms = 6 >> >> >> Is it kafka broker getting down and job is getting killed ? Whats the >> best way to handle it ? >> Increasing retries and backoff time wil help and to what values those >> should be set to never have streaming application failure - rather it keep >> on retrying after few seconds and send a event so that my custom code can >> send notification of kafka broker down if its because of that. >> >> >> Thanks >> >> >
Re: spark streaming 1.3 kafka topic error
Dears, I needs to commit DB Transaction for each partition,Not for each row. below didn't work for me. rdd.mapPartitions(partitionOfRecords = { DBConnectionInit() val results = partitionOfRecords.map(..) DBConnection.commit() }) Best regards, Ahmed Atef Nawwar Data Management Big Data Consultant On Thu, Aug 27, 2015 at 4:16 PM, Cody Koeninger c...@koeninger.org wrote: Your kafka broker died or you otherwise had a rebalance. Normally spark retries take care of that. Is there something going on with your kafka installation, that rebalance is taking especially long? Yes, increasing backoff / max number of retries will help, but it's better to figure out what's going on with kafka. On Wed, Aug 26, 2015 at 9:07 PM, Shushant Arora shushantaror...@gmail.com wrote: Hi My streaming application gets killed with below error 5/08/26 21:55:20 ERROR kafka.DirectKafkaInputDStream: ArrayBuffer(kafka.common.NotLeaderForPartitionException, kafka.common.NotLeaderForPartitionException, kafka.common.NotLeaderForPartitionException, kafka.common.NotLeaderForPartitionException, kafka.common.NotLeaderForPartitionException, org.apache.spark.SparkException: Couldn't find leader offsets for Set([testtopic,223], [testtopic,205], [testtopic,64], [testtopic,100], [testtopic,193])) 15/08/26 21:55:20 ERROR scheduler.JobScheduler: Error generating jobs for time 144062612 ms org.apache.spark.SparkException: ArrayBuffer(kafka.common.NotLeaderForPartitionException, org.apache.spark.SparkException: Couldn't find leader offsets for Set([testtopic,115])) at org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:94) at org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:116) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299) at Kafka params in job logs printed are : value.serializer = class org.apache.kafka.common.serialization.StringSerializer key.serializer = class org.apache.kafka.common.serialization.StringSerializer block.on.buffer.full = true retry.backoff.ms = 100 buffer.memory = 1048576 batch.size = 16384 metrics.sample.window.ms = 3 metadata.max.age.ms = 30 receive.buffer.bytes = 32768 timeout.ms = 3 max.in.flight.requests.per.connection = 5 bootstrap.servers = [broker1:9092, broker2:9092, broker3:9092] metric.reporters = [] client.id = compression.type = none retries = 0 max.request.size = 1048576 send.buffer.bytes = 131072 acks = all reconnect.backoff.ms = 10 linger.ms = 0 metrics.num.samples = 2 metadata.fetch.timeout.ms = 6 Is it kafka broker getting down and job is getting killed ? Whats the best way to handle it ? Increasing retries and backoff time wil help and to what values those should be set to never have streaming application failure - rather it keep on retrying after few seconds and send a event so that my custom code can send notification of kafka broker down if its because of that. Thanks
Re: spark streaming 1.3 kafka topic error
Map is lazy. You need an actual action, or nothing will happen. Use foreachPartition, or do an empty foreach after the map. On Thu, Aug 27, 2015 at 8:53 AM, Ahmed Nawar ahmed.na...@gmail.com wrote: Dears, I needs to commit DB Transaction for each partition,Not for each row. below didn't work for me. rdd.mapPartitions(partitionOfRecords = { DBConnectionInit() val results = partitionOfRecords.map(..) DBConnection.commit() }) Best regards, Ahmed Atef Nawwar Data Management Big Data Consultant On Thu, Aug 27, 2015 at 4:16 PM, Cody Koeninger c...@koeninger.org wrote: Your kafka broker died or you otherwise had a rebalance. Normally spark retries take care of that. Is there something going on with your kafka installation, that rebalance is taking especially long? Yes, increasing backoff / max number of retries will help, but it's better to figure out what's going on with kafka. On Wed, Aug 26, 2015 at 9:07 PM, Shushant Arora shushantaror...@gmail.com wrote: Hi My streaming application gets killed with below error 5/08/26 21:55:20 ERROR kafka.DirectKafkaInputDStream: ArrayBuffer(kafka.common.NotLeaderForPartitionException, kafka.common.NotLeaderForPartitionException, kafka.common.NotLeaderForPartitionException, kafka.common.NotLeaderForPartitionException, kafka.common.NotLeaderForPartitionException, org.apache.spark.SparkException: Couldn't find leader offsets for Set([testtopic,223], [testtopic,205], [testtopic,64], [testtopic,100], [testtopic,193])) 15/08/26 21:55:20 ERROR scheduler.JobScheduler: Error generating jobs for time 144062612 ms org.apache.spark.SparkException: ArrayBuffer(kafka.common.NotLeaderForPartitionException, org.apache.spark.SparkException: Couldn't find leader offsets for Set([testtopic,115])) at org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:94) at org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:116) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299) at Kafka params in job logs printed are : value.serializer = class org.apache.kafka.common.serialization.StringSerializer key.serializer = class org.apache.kafka.common.serialization.StringSerializer block.on.buffer.full = true retry.backoff.ms = 100 buffer.memory = 1048576 batch.size = 16384 metrics.sample.window.ms = 3 metadata.max.age.ms = 30 receive.buffer.bytes = 32768 timeout.ms = 3 max.in.flight.requests.per.connection = 5 bootstrap.servers = [broker1:9092, broker2:9092, broker3:9092] metric.reporters = [] client.id = compression.type = none retries = 0 max.request.size = 1048576 send.buffer.bytes = 131072 acks = all reconnect.backoff.ms = 10 linger.ms = 0 metrics.num.samples = 2 metadata.fetch.timeout.ms = 6 Is it kafka broker getting down and job is getting killed ? Whats the best way to handle it ? Increasing retries and backoff time wil help and to what values those should be set to never have streaming application failure - rather it keep on retrying after few seconds and send a event so that my custom code can send notification of kafka broker down if its because of that. Thanks