[ 
https://issues.apache.org/jira/browse/KAFKA-2435?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16376261#comment-16376261
 ] 

ASF GitHub Bot commented on KAFKA-2435:
---------------------------------------

hachikuji closed pull request #146: KAFKA-2435: Fair consumer partition 
assignment strategy
URL: https://github.com/apache/kafka/pull/146
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala 
b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
index 9eff3edd393..61c26a1a1c3 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
@@ -51,7 +51,7 @@ object ConsumerConfig extends Config {
   val MirrorTopicsWhitelistProp = "mirror.topics.whitelist"
   val MirrorTopicsBlacklistProp = "mirror.topics.blacklist"
   val ExcludeInternalTopics = true
-  val DefaultPartitionAssignmentStrategy = "range" /* select between "range", 
and "roundrobin" */
+  val DefaultPartitionAssignmentStrategy = "range" /* select between "range", 
"roundrobin", and "fair" */
   val MirrorConsumerNumThreadsProp = "mirror.consumer.numthreads"
   val DefaultClientId = ""
 
@@ -93,8 +93,9 @@ object ConsumerConfig extends Config {
     strategy match {
       case "range" =>
       case "roundrobin" =>
+      case "fair" =>
       case _ => throw new InvalidConfigException("Wrong value " + strategy + " 
of partition.assignment.strategy in consumer config; " +
-        "Valid values are 'range' and 'roundrobin'")
+        "Valid values are 'range', 'roundrobin', and 'fair'")
     }
   }
 }
@@ -193,7 +194,7 @@ class ConsumerConfig private (val props: 
VerifiableProperties) extends ZKConfig(
   /** Whether messages from internal topics (such as offsets) should be 
exposed to the consumer. */
   val excludeInternalTopics = props.getBoolean("exclude.internal.topics", 
ExcludeInternalTopics)
 
-  /** Select a strategy for assigning partitions to consumer streams. Possible 
values: range, roundrobin */
+  /** Select a strategy for assigning partitions to consumer streams. Possible 
values: range, roundrobin, fair */
   val partitionAssignmentStrategy = 
props.getString("partition.assignment.strategy", 
DefaultPartitionAssignmentStrategy)
   
   validate(this)
diff --git a/core/src/main/scala/kafka/consumer/PartitionAssignor.scala 
b/core/src/main/scala/kafka/consumer/PartitionAssignor.scala
index f02df352acf..42b35f7ec6d 100755
--- a/core/src/main/scala/kafka/consumer/PartitionAssignor.scala
+++ b/core/src/main/scala/kafka/consumer/PartitionAssignor.scala
@@ -37,6 +37,7 @@ trait PartitionAssignor {
 object PartitionAssignor {
   def createInstance(assignmentStrategy: String) = assignmentStrategy match {
     case "roundrobin" => new RoundRobinAssignor()
+    case "fair" => new FairAssignor()
     case _ => new RangeAssignor()
   }
 }
@@ -162,3 +163,76 @@ class RangeAssignor() extends PartitionAssignor with 
Logging {
     partitionAssignment
   }
 }
+
+/**
+ * The fair assignor attempts to balance partitions across consumers such that 
each consumer thread is assigned approximately
+ * the same number of partitions, even if the consumer topic subscriptions are 
substantially different (if they are identical,
+ * then the result will be equivalent to that of the roundrobin assignor). The 
running total of assignments per consumer
+ * thread is tracked as the algorithm executes in order to accomplish this.
+ *
+ * The algorithm starts with the topic with the fewest consumer subscriptions, 
and assigns its partitions in roundrobin
+ * fashion. In the event of a tie for least subscriptions, the topic with the 
highest partition count is assigned first, as
+ * this generally creates a more balanced distribution. The final tiebreaker 
is the topic name.
+ *
+ * The partitions for subsequent topics are assigned to the subscribing 
consumer with the fewest number of assignments.
+ * In the event of a tie for least assignments, the tiebreaker is the consumer 
id, so that the assignment pattern is fairly
+ * similar to how the roundrobin assignor functions.
+ *
+ * For example, suppose there are two consumers C0 and C1, two topics t0 and 
t1, and each topic has 3 partitions,
+ * resulting in partitions t0p0, t0p1, t0p2, t1p0, t1p1, and t1p2. If both C0 
and C1 are consuming t0, but only C1 is
+ * consuming t1 then the assignment will be:
+ * C0 -> [t0p0, t0p1, t0p2]
+ * C1 -> [t1p0, t1p1, t1p2]
+ */
+class FairAssignor() extends PartitionAssignor with Logging {
+
+  def assign(ctx: AssignmentContext) = {
+    val valueFactory = (topic: String) => new 
mutable.HashMap[TopicAndPartition, ConsumerThreadId]
+    val partitionAssignment =
+      new Pool[String, mutable.Map[TopicAndPartition, 
ConsumerThreadId]](Some(valueFactory))
+
+    if (ctx.consumersForTopic.size > 0) {
+      val allThreadIds = ctx.consumersForTopic.flatMap { case (topic, 
threadIds) => threadIds }
+
+      // Map for tracking the total number of partitions assigned to each 
consumer thread
+      val consumerAssignmentCounts: mutable.Map[ConsumerThreadId, Int] = 
mutable.Map()
+      for (threadId <- allThreadIds) {
+        consumerAssignmentCounts(threadId) = 0
+      }
+
+      // Assign topics with fewer consumers first, tiebreakers are most 
partitions, then topic name
+      val topicConsumerCounts = ctx.consumersForTopic.map { case(topic, 
threadIds) =>
+        (topic -> threadIds.size)
+      }.toList.sortBy {
+        count => (count._2, -ctx.partitionsForTopic(count._1).size, count._1)
+      }
+
+      val allTopicPartitions = topicConsumerCounts.flatMap { 
topicConsumerCount =>
+        val topic = topicConsumerCount._1
+        val partitions = ctx.partitionsForTopic(topic)
+        info("Consumer %s rebalancing the following partitions for topic %s: 
%s"
+            .format(ctx.consumerId, topic, partitions))
+        ctx.partitionsForTopic(topic).map(partition => {
+            TopicAndPartition(topic, partition)
+        })
+      }
+
+      allTopicPartitions.foreach(topicPartition => {
+        val topicConsumers = ctx.consumersForTopic(topicPartition.topic)
+        val filteredCounts = consumerAssignmentCounts.toList.filter(consumer 
=> topicConsumers.contains(consumer._1))
+
+        // Assign partition to consumer thread with least assignments, 
tiebreaker is consumer thread id
+        val threadId = filteredCounts.sortBy(count => (count._2, 
count._1.toString)).head._1
+        consumerAssignmentCounts(threadId) += 1
+
+        // record the partition ownership decision
+        val assignmentForConsumer = 
partitionAssignment.getAndMaybePut(threadId.consumer)
+        assignmentForConsumer += (topicPartition -> threadId)
+      })
+    }
+
+    // assign Map.empty for the consumers which are not associated with topic 
partitions
+    ctx.consumers.foreach(consumerId => 
partitionAssignment.getAndMaybePut(consumerId))
+    partitionAssignment
+  }
+}
diff --git 
a/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala 
b/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala
index 1e45bfbf6e9..0754e47a0b2 100644
--- a/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala
@@ -59,25 +59,25 @@ class PartitionAssignorTest extends Logging {
     val assignor = new RoundRobinAssignor
 
     /** test static subscription scenarios */
-    (1 to PartitionAssignorTest.TestCaseCount).foreach (testCase => {
+    (1 to PartitionAssignorTest.TestCaseCount).foreach { _ =>
       val consumerCount = 
1.max(TestUtils.random.nextInt(PartitionAssignorTest.MaxConsumerCount + 1))
       val topicCount = 
PartitionAssignorTest.MinTopicCount.max(TestUtils.random.nextInt(PartitionAssignorTest.MaxTopicCount
 + 1))
 
       val topicPartitionCounts = Map((1 to topicCount).map(topic => {
         ("topic-" + topic, 
PartitionAssignorTest.MinPartitionCount.max(TestUtils.random.nextInt(PartitionAssignorTest.MaxPartitionCount)))
-      }).toSeq:_*)
+      }):_*)
 
       val subscriptions = Map((1 to consumerCount).map(consumer => {
         val streamCounts = Map((1 to topicCount).map(topic => {
             ("topic-" + topic, 1)
-          }).toSeq:_*)
+          }):_*)
         ("g1c" + consumer, StaticSubscriptionInfo(streamCounts))
-      }).toSeq:_*)
+      }):_*)
       val scenario = Scenario("g1", topicPartitionCounts, subscriptions)
       val zkUtils = PartitionAssignorTest.setupZkClientMock(scenario)
       EasyMock.replay(zkUtils.zkClient)
       PartitionAssignorTest.assignAndVerify(scenario, assignor, zkUtils, 
verifyAssignmentIsUniform = true)
-    })
+    }
   }
 
   @Test
@@ -86,27 +86,27 @@ class PartitionAssignorTest extends Logging {
     val minConsumerCount = 5
 
     /** test unbalanced static subscription scenarios */
-    (1 to PartitionAssignorTest.TestCaseCount).foreach (testCase => {
+    (1 to PartitionAssignorTest.TestCaseCount).foreach { _ =>
       val consumerCount = 
minConsumerCount.max(TestUtils.random.nextInt(PartitionAssignorTest.MaxConsumerCount
 + 1))
       val topicCount = 10
 
       val topicPartitionCounts = Map((1 to topicCount).map(topic => {
         ("topic-" + topic, 10)
-      }).toSeq:_*)
+      }):_*)
 
       val subscriptions = Map((1 to consumerCount).map(consumer => {
         // Exclude some topics from some consumers
         val topicRange = (1 to topicCount - consumer % minConsumerCount)
         val streamCounts = Map(topicRange.map(topic => {
             ("topic-" + topic, 3)
-          }).toSeq:_*)
+          }):_*)
         ("g1c" + consumer, StaticSubscriptionInfo(streamCounts))
-      }).toSeq:_*)
+      }):_*)
       val scenario = Scenario("g1", topicPartitionCounts, subscriptions)
       val zkUtils = PartitionAssignorTest.setupZkClientMock(scenario)
       EasyMock.replay(zkUtils.zkClient)
       PartitionAssignorTest.assignAndVerify(scenario, assignor, zkUtils)
-    })
+    }
   }
 
   @Test
@@ -134,6 +134,85 @@ class PartitionAssignorTest extends Logging {
       PartitionAssignorTest.assignAndVerify(scenario, assignor, zkUtils)
     }
   }
+
+  @Test
+  def testFairPartitionAssignor() {
+    val assignor = new FairAssignor
+
+    /** various scenarios with only wildcard consumers */
+    (1 to PartitionAssignorTest.TestCaseCount).foreach { _ =>
+      val consumerCount = 
1.max(TestUtils.random.nextInt(PartitionAssignorTest.MaxConsumerCount + 1))
+      val topicCount = 
PartitionAssignorTest.MinTopicCount.max(TestUtils.random.nextInt(PartitionAssignorTest.MaxTopicCount
 + 1))
+
+      val topicPartitionCounts = Map((1 to topicCount).map(topic => {
+        ("topic-" + topic, 
PartitionAssignorTest.MinPartitionCount.max(TestUtils.random.nextInt(PartitionAssignorTest.MaxPartitionCount)))
+      }):_*)
+
+      val subscriptions = Map((1 to consumerCount).map(consumer => {
+        val streamCount = 
1.max(TestUtils.random.nextInt(PartitionAssignorTest.MaxStreamCount + 1))
+        ("g1c" + consumer, WildcardSubscriptionInfo(streamCount, ".*", 
isWhitelist = true))
+      }):_*)
+      val scenario = Scenario("g1", topicPartitionCounts, subscriptions)
+      val zkUtils = PartitionAssignorTest.setupZkClientMock(scenario)
+      EasyMock.replay(zkUtils.zkClient)
+      PartitionAssignorTest.assignAndVerify(scenario, assignor, zkUtils, 
verifyAssignmentIsUniform = true)
+    }
+  }
+
+  @Test
+  def testFairPartitionAssignorStaticSubscriptions() {
+    val assignor = new FairAssignor
+
+    /** test static subscription scenarios */
+    (1 to PartitionAssignorTest.TestCaseCount).foreach { _ =>
+      val consumerCount = 
1.max(TestUtils.random.nextInt(PartitionAssignorTest.MaxConsumerCount + 1))
+      val topicCount = 
PartitionAssignorTest.MinTopicCount.max(TestUtils.random.nextInt(PartitionAssignorTest.MaxTopicCount
 + 1))
+
+      val topicPartitionCounts = Map((1 to topicCount).map(topic => {
+        ("topic-" + topic, 
PartitionAssignorTest.MinPartitionCount.max(TestUtils.random.nextInt(PartitionAssignorTest.MaxPartitionCount)))
+      }):_*)
+
+      val subscriptions = Map((1 to consumerCount).map(consumer => {
+        val streamCounts = Map((1 to topicCount).map(topic => {
+            ("topic-" + topic, 1)
+          }):_*)
+        ("g1c" + consumer, StaticSubscriptionInfo(streamCounts))
+      }):_*)
+      val scenario = Scenario("g1", topicPartitionCounts, subscriptions)
+      val zkUtils = PartitionAssignorTest.setupZkClientMock(scenario)
+      EasyMock.replay(zkUtils.zkClient)
+      PartitionAssignorTest.assignAndVerify(scenario, assignor, zkUtils, 
verifyAssignmentIsUniform = true)
+    }
+  }
+
+  @Test
+  def testFairPartitionAssignorUnbalancedStaticSubscriptions() {
+    val assignor = new FairAssignor
+    val minConsumerCount = 5
+
+    /** test unbalanced static subscription scenarios */
+    (1 to PartitionAssignorTest.TestCaseCount).foreach { _ =>
+      val consumerCount = 
minConsumerCount.max(TestUtils.random.nextInt(PartitionAssignorTest.MaxConsumerCount
 + 1))
+      val topicCount = 10
+
+      val topicPartitionCounts = Map((1 to topicCount).map(topic => {
+        ("topic-" + topic, 10)
+      }):_*)
+
+      val subscriptions = Map((1 to consumerCount).map(consumer => {
+        // Exclude some topics from some consumers
+        val topicRange = (1 to topicCount - consumer % minConsumerCount)
+        val streamCounts = Map(topicRange.map(topic => {
+            ("topic-" + topic, 3)
+          }):_*)
+        ("g1c" + consumer, StaticSubscriptionInfo(streamCounts))
+      }):_*)
+      val scenario = Scenario("g1", topicPartitionCounts, subscriptions)
+      val zkUtils = PartitionAssignorTest.setupZkClientMock(scenario)
+      EasyMock.replay(zkUtils.zkClient)
+      PartitionAssignorTest.assignAndVerify(scenario, assignor, zkUtils, 
verifyAssignmentIsUniform = true)
+    }
+  }
 }
 
 private object PartitionAssignorTest extends Logging {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> More optimally balanced partition assignment strategy
> -----------------------------------------------------
>
>                 Key: KAFKA-2435
>                 URL: https://issues.apache.org/jira/browse/KAFKA-2435
>             Project: Kafka
>          Issue Type: Improvement
>            Reporter: Andrew Olson
>            Assignee: Andrew Olson
>            Priority: Major
>             Fix For: 2.0.0
>
>         Attachments: KAFKA-2435.patch
>
>
> While the roundrobin partition assignment strategy is an improvement over the 
> range strategy, when the consumer topic subscriptions are not identical 
> (previously disallowed but will be possible as of KAFKA-2172) it can produce 
> heavily skewed assignments. As suggested 
> [here|https://issues.apache.org/jira/browse/KAFKA-2172?focusedCommentId=14530767&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14530767]
>  it would be nice to have a strategy that attempts to assign an equal number 
> of partitions to each consumer in a group, regardless of how similar their 
> individual topic subscriptions are. We can accomplish this by tracking the 
> number of partitions assigned to each consumer, and having the partition 
> assignment loop assign each partition to a consumer interested in that topic 
> with the least number of partitions assigned. 
> Additionally, we can optimize the distribution fairness by adjusting the 
> partition assignment order:
> * Topics with fewer consumers are assigned first.
> * In the event of a tie for least consumers, the topic with more partitions 
> is assigned first.
> The general idea behind these two rules is to keep the most flexible 
> assignment choices available as long as possible by starting with the most 
> constrained partitions/consumers.
> This JIRA addresses the original high-level consumer. For the new consumer, 
> see KAFKA-3297.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to