[
https://issues.apache.org/jira/browse/KAFKA-2435?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16376261#comment-16376261
]
ASF GitHub Bot commented on KAFKA-2435:
---
hachikuji closed pull request #146: KAFKA-2435: Fair consumer partition
assignment strategy
URL: https://github.com/apache/kafka/pull/146
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
index 9eff3edd393..61c26a1a1c3 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
@@ -51,7 +51,7 @@ object ConsumerConfig extends Config {
val MirrorTopicsWhitelistProp = "mirror.topics.whitelist"
val MirrorTopicsBlacklistProp = "mirror.topics.blacklist"
val ExcludeInternalTopics = true
- val DefaultPartitionAssignmentStrategy = "range" /* select between "range",
and "roundrobin" */
+ val DefaultPartitionAssignmentStrategy = "range" /* select between "range",
"roundrobin", and "fair" */
val MirrorConsumerNumThreadsProp = "mirror.consumer.numthreads"
val DefaultClientId = ""
@@ -93,8 +93,9 @@ object ConsumerConfig extends Config {
strategy match {
case "range" =>
case "roundrobin" =>
+ case "fair" =>
case _ => throw new InvalidConfigException("Wrong value " + strategy + "
of partition.assignment.strategy in consumer config; " +
-"Valid values are 'range' and 'roundrobin'")
+"Valid values are 'range', 'roundrobin', and 'fair'")
}
}
}
@@ -193,7 +194,7 @@ class ConsumerConfig private (val props:
VerifiableProperties) extends ZKConfig(
/** Whether messages from internal topics (such as offsets) should be
exposed to the consumer. */
val excludeInternalTopics = props.getBoolean("exclude.internal.topics",
ExcludeInternalTopics)
- /** Select a strategy for assigning partitions to consumer streams. Possible
values: range, roundrobin */
+ /** Select a strategy for assigning partitions to consumer streams. Possible
values: range, roundrobin, fair */
val partitionAssignmentStrategy =
props.getString("partition.assignment.strategy",
DefaultPartitionAssignmentStrategy)
validate(this)
diff --git a/core/src/main/scala/kafka/consumer/PartitionAssignor.scala
b/core/src/main/scala/kafka/consumer/PartitionAssignor.scala
index f02df352acf..42b35f7ec6d 100755
--- a/core/src/main/scala/kafka/consumer/PartitionAssignor.scala
+++ b/core/src/main/scala/kafka/consumer/PartitionAssignor.scala
@@ -37,6 +37,7 @@ trait PartitionAssignor {
object PartitionAssignor {
def createInstance(assignmentStrategy: String) = assignmentStrategy match {
case "roundrobin" => new RoundRobinAssignor()
+case "fair" => new FairAssignor()
case _ => new RangeAssignor()
}
}
@@ -162,3 +163,76 @@ class RangeAssignor() extends PartitionAssignor with
Logging {
partitionAssignment
}
}
+
+/**
+ * The fair assignor attempts to balance partitions across consumers such that
each consumer thread is assigned approximately
+ * the same number of partitions, even if the consumer topic subscriptions are
substantially different (if they are identical,
+ * then the result will be equivalent to that of the roundrobin assignor). The
running total of assignments per consumer
+ * thread is tracked as the algorithm executes in order to accomplish this.
+ *
+ * The algorithm starts with the topic with the fewest consumer subscriptions,
and assigns its partitions in roundrobin
+ * fashion. In the event of a tie for least subscriptions, the topic with the
highest partition count is assigned first, as
+ * this generally creates a more balanced distribution. The final tiebreaker
is the topic name.
+ *
+ * The partitions for subsequent topics are assigned to the subscribing
consumer with the fewest number of assignments.
+ * In the event of a tie for least assignments, the tiebreaker is the consumer
id, so that the assignment pattern is fairly
+ * similar to how the roundrobin assignor functions.
+ *
+ * For example, suppose there are two consumers C0 and C1, two topics t0 and
t1, and each topic has 3 partitions,
+ * resulting in partitions t0p0, t0p1, t0p2, t1p0, t1p1, and t1p2. If both C0
and C1 are consuming t0, but only C1 is
+ * consuming t1 then the assignment will be:
+ * C0 -> [t0p0, t0p1, t0p2]
+ * C1 -> [t1p0, t1p1, t1p2]
+ */
+class FairAssignor() extends PartitionAssignor with Logging {
+
+ def assign(ctx: AssignmentContext) = {
+val valueFactory = (topic: String) => new
mutable.HashMap[TopicAndPartition, ConsumerThreadId]
+val partitionAssignment =
+