Hi Dibyendu, Thank you for answer. I will try the Spark-Kafka consumer.
Bill On Sat, Nov 22, 2014 at 9:15 PM, Dibyendu Bhattacharya < dibyendu.bhattach...@gmail.com> wrote: > I believe this is something to do with how Kafka High Level API manages > consumers within a Consumer group and how it re-balance during failure. You > can find some mention in this Kafka wiki. > > https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Client+Re-Design > > Due to various issues in Kafka High Level APIs, Kafka is moving the High > Level Consumer API to a complete new set of API in Kafka 0.9. > > Other than this co-ordination issue, High Level consumer also has data > loss issues. > > You can probably try this Spark-Kafka consumer which uses Low Level Simple > consumer API which is more performant and have no data loss scenarios. > > https://github.com/dibbhatt/kafka-spark-consumer > > Regards, > Dibyendu > > On Sun, Nov 23, 2014 at 2:13 AM, Bill Jay <bill.jaypeter...@gmail.com> > wrote: > >> Hi all, >> >> I am using Spark to consume from Kafka. However, after the job has run >> for several hours, I saw the following failure of an executor: >> >> kafka.common.ConsumerRebalanceFailedException: >> group-1416624735998_ip-172-31-5-242.ec2.internal-1416648124230-547d2c31 >> can't rebalance after 4 retries >> >> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:432) >> >> kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:722) >> >> kafka.consumer.ZookeeperConsumerConnector.consume(ZookeeperConsumerConnector.scala:212) >> >> kafka.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:138) >> >> org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:114) >> >> org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121) >> >> org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106) >> >> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:264) >> >> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257) >> >> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121) >> >> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121) >> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) >> org.apache.spark.scheduler.Task.run(Task.scala:54) >> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) >> >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >> >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >> java.lang.Thread.run(Thread.java:745) >> >> >> Does anyone know the reason for this exception? Thanks! >> >> Bill >> >> >