KAFKA-987 Avoid checkpointing offsets in Kafka consumer that have not changed since the last commit; reviewed by Neha Narkhede
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/401d5919 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/401d5919 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/401d5919 Branch: refs/heads/trunk Commit: 401d59199cf727f5001dcec3eaa05fa9cdfd079e Parents: ce7d588 Author: Swapnil Ghike <[email protected]> Authored: Tue Jul 23 11:18:50 2013 -0700 Committer: Neha Narkhede <[email protected]> Committed: Tue Jul 23 11:19:24 2013 -0700 ---------------------------------------------------------------------- .../consumer/ZookeeperConsumerConnector.scala | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/401d5919/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index e3944d5..a2ea5a9 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -85,6 +85,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, private var fetcher: Option[ConsumerFetcherManager] = None private var zkClient: ZkClient = null private var topicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]] + private var checkpointedOffsets = new Pool[TopicAndPartition, Long] private val topicThreadIdAndQueues = new Pool[(String,String), BlockingQueue[FetchedDataChunk]] private val scheduler = new KafkaScheduler(1) private val messageStreamCreated = new AtomicBoolean(false) @@ -249,15 +250,17 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, val topicDirs = new ZKGroupTopicDirs(config.groupId, topic) for (info <- infos.values) { val newOffset = info.getConsumeOffset - try { - updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + info.partitionId, - newOffset.toString) - } catch { - case t: Throwable => - // log it and let it go - warn("exception during commitOffsets", t) + if (newOffset != checkpointedOffsets.get(TopicAndPartition(topic, info.partitionId))) { + try { + updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + info.partitionId, newOffset.toString) + checkpointedOffsets.put(TopicAndPartition(topic, info.partitionId), newOffset) + } catch { + case t: Throwable => + // log it and let it go + warn("exception during commitOffsets", t) + } + debug("Committed offset " + newOffset + " for topic " + info) } - debug("Committed offset " + newOffset + " for topic " + info) } } } @@ -607,6 +610,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, config.clientId) partTopicInfoMap.put(partition, partTopicInfo) debug(partTopicInfo + " selected new offset " + offset) + checkpointedOffsets.put(TopicAndPartition(topic, partition), offset) } }
