Repository: kafka Updated Branches: refs/heads/trunk ffb81a581 -> e22f8eded
KAFKA-1510; Do full (unfiltered) offset commits when offsets storage is set to Kafka; reviewed by Joel Koshy Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e22f8ede Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e22f8ede Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e22f8ede Branch: refs/heads/trunk Commit: e22f8ededaf838e7cec9dd22975d8461764ab076 Parents: ffb81a5 Author: Nicu Marasoiu <[email protected]> Authored: Fri Sep 5 12:20:31 2014 -0700 Committer: Joel Koshy <[email protected]> Committed: Fri Sep 5 12:20:31 2014 -0700 ---------------------------------------------------------------------- .../consumer/ZookeeperConsumerConnector.scala | 26 +++++++++----------- 1 file changed, 11 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/e22f8ede/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index 21f3e00..fbc680f 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -89,7 +89,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, private var fetcher: Option[ConsumerFetcherManager] = None private var zkClient: ZkClient = null private var topicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]] - private val checkpointedOffsets = new Pool[TopicAndPartition, Long] + private val checkpointedZkOffsets = new Pool[TopicAndPartition, Long] private val topicThreadIdAndQueues = new Pool[(String, ConsumerThreadId), BlockingQueue[FetchedDataChunk]] private val scheduler = new KafkaScheduler(threads = 1, threadNamePrefix = "kafka-consumer-scheduler-") private val messageStreamCreated = new AtomicBoolean(false) @@ -277,9 +277,12 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, } def commitOffsetToZooKeeper(topicPartition: TopicAndPartition, offset: Long) { - val topicDirs = new ZKGroupTopicDirs(config.groupId, topicPartition.topic) - updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + topicPartition.partition, offset.toString) - zkCommitMeter.mark() + if (checkpointedZkOffsets.get(topicPartition) != offset) { + val topicDirs = new ZKGroupTopicDirs(config.groupId, topicPartition.topic) + updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + topicPartition.partition, offset.toString) + checkpointedZkOffsets.put(topicPartition, offset) + zkCommitMeter.mark() + } } def commitOffsets(isAutoCommit: Boolean = true) { @@ -289,10 +292,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, while (!done) { val committed = offsetsChannelLock synchronized { // committed when we receive either no error codes or only MetadataTooLarge errors val offsetsToCommit = immutable.Map(topicRegistry.flatMap { case (topic, partitionTopicInfos) => - partitionTopicInfos.filterNot { case (partition, info) => - val newOffset = info.getConsumeOffset() - newOffset == checkpointedOffsets.get(TopicAndPartition(topic, info.partitionId)) - }.map { case (partition, info) => + partitionTopicInfos.map { case (partition, info) => TopicAndPartition(info.topic, info.partitionId) -> OffsetAndMetadata(info.getConsumeOffset()) } }.toSeq:_*) @@ -301,7 +301,6 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, if (config.offsetsStorage == "zookeeper") { offsetsToCommit.foreach { case(topicAndPartition, offsetAndMetadata) => commitOffsetToZooKeeper(topicAndPartition, offsetAndMetadata.offset) - checkpointedOffsets.put(topicAndPartition, offsetAndMetadata.offset) } true } else { @@ -316,12 +315,9 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, val (commitFailed, retryableIfFailed, shouldRefreshCoordinator, errorCount) = { offsetCommitResponse.commitStatus.foldLeft(false, false, false, 0) { case(folded, (topicPartition, errorCode)) => - if (errorCode == ErrorMapping.NoError) { - val offset = offsetsToCommit(topicPartition).offset - checkpointedOffsets.put(topicPartition, offset) - if (config.dualCommitEnabled) { + if (errorCode == ErrorMapping.NoError && config.dualCommitEnabled) { + val offset = offsetsToCommit(topicPartition).offset commitOffsetToZooKeeper(topicPartition, offset) - } } (folded._1 || // update commitFailed @@ -808,7 +804,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, config.clientId) partTopicInfoMap.put(partition, partTopicInfo) debug(partTopicInfo + " selected new offset " + offset) - checkpointedOffsets.put(TopicAndPartition(topic, partition), offset) + checkpointedZkOffsets.put(TopicAndPartition(topic, partition), offset) } }
