Hi,
I am trying to shutdown kafka consumer (version 0.8.2) gracefully by
calling consumer.shutdown (ConsumerConnector.shutdown) and then waiting for
executor threads to finish. However what I have noticed is that during next
start, some of the messages are replayed. I have auto commit enabled.

I looked at the code of kafka.consumer.ZookeeperConsumerConnector
<https://github.com/kafka-dev/kafka/blob/master/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala>
  and realised that there is no commitOffset call during shutdown. Here is
the code snippet from ZookeeperConsumerConnector class. Ideally after
shutting down fetchers, commit offset should have been called to commit all
offsets fetched till now. Also I noticed that during shutdown, zkClient is
being closed, so I can not call commitOffset from outside also after
shutdown.

Is this expected behaviour? Or there is anything I am missing. Is there any
way using high level consumer to make sure all offsets are committed before
shutting down?


def shutdown() {
val canShutdown = isShuttingDown.compareAndSet(false, true);
if (canShutdown) {
logger.info("ZKConsumerConnector shutting down")
try {
scheduler.shutdown
fetcher match {
case Some(f) => f.shutdown
case None =>
}
sendShudownToAllQueues
if (zkClient != null) {
zkClient.close()
zkClient = null
}
}
catch {
case e =>
logger.fatal(e)
logger.fatal(Utils.stackTrace(e))
}
logger.info("ZKConsumerConnector shut down completed")
}
}

Reply via email to