Repository: kafka Updated Branches: refs/heads/trunk 1fdb758f2 -> 96534a7d5
KAFKA-2336: Changing offsets.topic.num.partitions after the offset topic is created breaks consumer group partition assignment; Reviewed by Jiangjie Qin, Gwen Shapira Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/96534a7d Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/96534a7d Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/96534a7d Branch: refs/heads/trunk Commit: 96534a7d502be58026feec3c2012f022bf330049 Parents: 1fdb758 Author: Grant Henke <[email protected]> Authored: Tue Aug 11 15:07:40 2015 -0700 Committer: Gwen Shapira <[email protected]> Committed: Tue Aug 11 15:07:40 2015 -0700 ---------------------------------------------------------------------- .../src/main/scala/kafka/server/OffsetManager.scala | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/96534a7d/core/src/main/scala/kafka/server/OffsetManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/OffsetManager.scala b/core/src/main/scala/kafka/server/OffsetManager.scala index 47b6ce9..0e613e7 100755 --- a/core/src/main/scala/kafka/server/OffsetManager.scala +++ b/core/src/main/scala/kafka/server/OffsetManager.scala @@ -96,6 +96,7 @@ class OffsetManager(val config: OffsetManagerConfig, private val loadingPartitions: mutable.Set[Int] = mutable.Set() private val cleanupOrLoadMutex = new Object private val shuttingDown = new AtomicBoolean(false) + private val offsetsTopicPartitionCount = getOffsetsTopicPartitionCount this.logIdent = "[Offset Manager on Broker " + replicaManager.config.brokerId + "]: " @@ -170,7 +171,7 @@ class OffsetManager(val config: OffsetManagerConfig, } - def partitionFor(group: String): Int = Utils.abs(group.hashCode) % config.offsetsTopicNumPartitions + def partitionFor(group: String): Int = Utils.abs(group.hashCode) % offsetsTopicPartitionCount /** * Fetch the current offset for the given group/topic/partition from the underlying offsets storage. @@ -436,13 +437,24 @@ class OffsetManager(val config: OffsetManagerConfig, if (numRemoved > 0) info("Removed %d cached offsets for %s on follower transition." .format(numRemoved, TopicAndPartition(ConsumerCoordinator.OffsetsTopicName, offsetsPartition))) - } def shutdown() { shuttingDown.set(true) } + /** + * Gets the partition count of the offsets topic from ZooKeeper. + * If the topic does not exist, the configured partition count is returned. + */ + private def getOffsetsTopicPartitionCount = { + val topic = ConsumerCoordinator.OffsetsTopicName + val topicData = ZkUtils.getPartitionAssignmentForTopics(zkClient, Seq(topic)) + if (topicData(topic).nonEmpty) + topicData(topic).size + else + config.offsetsTopicNumPartitions + } } object OffsetManager {
