Hi Dibyendu,

Thank you for answer. I will try the Spark-Kafka consumer.

Bill

On Sat, Nov 22, 2014 at 9:15 PM, Dibyendu Bhattacharya <
dibyendu.bhattach...@gmail.com> wrote:

> I believe this is something to do with how Kafka High Level API manages
> consumers within a Consumer group and how it re-balance during failure. You
> can find some mention in this Kafka wiki.
>
> https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Client+Re-Design
>
> Due to various issues in Kafka High Level APIs, Kafka is moving the High
> Level Consumer API to a complete new set of API in Kafka 0.9.
>
> Other than this co-ordination issue, High Level consumer also has data
> loss issues.
>
> You can probably try this Spark-Kafka consumer which uses Low Level Simple
> consumer API which is more performant and have no data loss scenarios.
>
> https://github.com/dibbhatt/kafka-spark-consumer
>
> Regards,
> Dibyendu
>
> On Sun, Nov 23, 2014 at 2:13 AM, Bill Jay <bill.jaypeter...@gmail.com>
> wrote:
>
>> Hi all,
>>
>> I am using Spark to consume from Kafka. However, after the job has run
>> for several hours, I saw the following failure of an executor:
>>
>> kafka.common.ConsumerRebalanceFailedException: 
>> group-1416624735998_ip-172-31-5-242.ec2.internal-1416648124230-547d2c31 
>> can't rebalance after 4 retries
>>         
>> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:432)
>>         
>> kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:722)
>>         
>> kafka.consumer.ZookeeperConsumerConnector.consume(ZookeeperConsumerConnector.scala:212)
>>         
>> kafka.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:138)
>>         
>> org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:114)
>>         
>> org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121)
>>         
>> org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106)
>>         
>> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:264)
>>         
>> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257)
>>         
>> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)
>>         
>> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)
>>         org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
>>         org.apache.spark.scheduler.Task.run(Task.scala:54)
>>         org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
>>         
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>         
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>         java.lang.Thread.run(Thread.java:745)
>>
>>
>> Does anyone know the reason for this exception? Thanks!
>>
>> Bill
>>
>>
>

Reply via email to