Repository: kafka Updated Branches: refs/heads/trunk a314461fa -> 58e58529b
KAFKA-1648; Robin consumer balance throws an NPE when there are no topics Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/58e58529 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/58e58529 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/58e58529 Branch: refs/heads/trunk Commit: 58e58529b350a3da860b1f51fdfa356dfc42761f Parents: a314461 Author: Mayuresh Gharat <[email protected]> Authored: Thu Oct 9 16:34:40 2014 -0700 Committer: Joel Koshy <[email protected]> Committed: Thu Oct 9 16:34:40 2014 -0700 ---------------------------------------------------------------------- .../kafka/consumer/PartitionAssignor.scala | 62 ++++++++++---------- .../kafka/consumer/PartitionAssignorTest.scala | 2 +- 2 files changed, 33 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/58e58529/core/src/main/scala/kafka/consumer/PartitionAssignor.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/consumer/PartitionAssignor.scala b/core/src/main/scala/kafka/consumer/PartitionAssignor.scala index 8ea7368..e6ff768 100644 --- a/core/src/main/scala/kafka/consumer/PartitionAssignor.scala +++ b/core/src/main/scala/kafka/consumer/PartitionAssignor.scala @@ -71,39 +71,41 @@ class RoundRobinAssignor() extends PartitionAssignor with Logging { def assign(ctx: AssignmentContext) = { val partitionOwnershipDecision = collection.mutable.Map[TopicAndPartition, ConsumerThreadId]() - // check conditions (a) and (b) - val (headTopic, headThreadIdSet) = (ctx.consumersForTopic.head._1, ctx.consumersForTopic.head._2.toSet) - ctx.consumersForTopic.foreach { case (topic, threadIds) => - val threadIdSet = threadIds.toSet - require(threadIdSet == headThreadIdSet, - "Round-robin assignment is allowed only if all consumers in the group subscribe to the same topics, " + - "AND if the stream counts across topics are identical for a given consumer instance.\n" + - "Topic %s has the following available consumer streams: %s\n".format(topic, threadIdSet) + - "Topic %s has the following available consumer streams: %s\n".format(headTopic, headThreadIdSet)) - } + if (ctx.consumersForTopic.size > 0) { + // check conditions (a) and (b) + val (headTopic, headThreadIdSet) = (ctx.consumersForTopic.head._1, ctx.consumersForTopic.head._2.toSet) + ctx.consumersForTopic.foreach { case (topic, threadIds) => + val threadIdSet = threadIds.toSet + require(threadIdSet == headThreadIdSet, + "Round-robin assignment is allowed only if all consumers in the group subscribe to the same topics, " + + "AND if the stream counts across topics are identical for a given consumer instance.\n" + + "Topic %s has the following available consumer streams: %s\n".format(topic, threadIdSet) + + "Topic %s has the following available consumer streams: %s\n".format(headTopic, headThreadIdSet)) + } - val threadAssignor = Utils.circularIterator(headThreadIdSet.toSeq.sorted) + val threadAssignor = Utils.circularIterator(headThreadIdSet.toSeq.sorted) + + info("Starting round-robin assignment with consumers " + ctx.consumers) + val allTopicPartitions = ctx.partitionsForTopic.flatMap { case (topic, partitions) => + info("Consumer %s rebalancing the following partitions for topic %s: %s" + .format(ctx.consumerId, topic, partitions)) + partitions.map(partition => { + TopicAndPartition(topic, partition) + }) + }.toSeq.sortWith((topicPartition1, topicPartition2) => { + /* + * Randomize the order by taking the hashcode to reduce the likelihood of all partitions of a given topic ending + * up on one consumer (if it has a high enough stream count). + */ + topicPartition1.toString.hashCode < topicPartition2.toString.hashCode + }) - info("Starting round-robin assignment with consumers " + ctx.consumers) - val allTopicPartitions = ctx.partitionsForTopic.flatMap { case(topic, partitions) => - info("Consumer %s rebalancing the following partitions for topic %s: %s" - .format(ctx.consumerId, topic, partitions)) - partitions.map(partition => { - TopicAndPartition(topic, partition) + allTopicPartitions.foreach(topicPartition => { + val threadId = threadAssignor.next() + if (threadId.consumer == ctx.consumerId) + partitionOwnershipDecision += (topicPartition -> threadId) }) - }.toSeq.sortWith((topicPartition1, topicPartition2) => { - /* - * Randomize the order by taking the hashcode to reduce the likelihood of all partitions of a given topic ending - * up on one consumer (if it has a high enough stream count). - */ - topicPartition1.toString.hashCode < topicPartition2.toString.hashCode - }) - - allTopicPartitions.foreach(topicPartition => { - val threadId = threadAssignor.next() - if (threadId.consumer == ctx.consumerId) - partitionOwnershipDecision += (topicPartition -> threadId) - }) + } partitionOwnershipDecision } http://git-wip-us.apache.org/repos/asf/kafka/blob/58e58529/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala b/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala index 9ceae22..24954de 100644 --- a/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala +++ b/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala @@ -87,7 +87,7 @@ private object PartitionAssignorTest extends Logging { private val MaxConsumerCount = 10 private val MaxStreamCount = 8 private val MaxTopicCount = 100 - private val MinTopicCount = 20 + private val MinTopicCount = 0 private val MaxPartitionCount = 120 private val MinPartitionCount = 8
