[jira] [Commented] (KAFKA-2435) More optimally balanced partition assignment strategy

2018-02-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2435?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16376261#comment-16376261
 ] 

ASF GitHub Bot commented on KAFKA-2435:
---

hachikuji closed pull request #146: KAFKA-2435: Fair consumer partition 
assignment strategy
URL: https://github.com/apache/kafka/pull/146
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala 
b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
index 9eff3edd393..61c26a1a1c3 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
@@ -51,7 +51,7 @@ object ConsumerConfig extends Config {
   val MirrorTopicsWhitelistProp = "mirror.topics.whitelist"
   val MirrorTopicsBlacklistProp = "mirror.topics.blacklist"
   val ExcludeInternalTopics = true
-  val DefaultPartitionAssignmentStrategy = "range" /* select between "range", 
and "roundrobin" */
+  val DefaultPartitionAssignmentStrategy = "range" /* select between "range", 
"roundrobin", and "fair" */
   val MirrorConsumerNumThreadsProp = "mirror.consumer.numthreads"
   val DefaultClientId = ""
 
@@ -93,8 +93,9 @@ object ConsumerConfig extends Config {
 strategy match {
   case "range" =>
   case "roundrobin" =>
+  case "fair" =>
   case _ => throw new InvalidConfigException("Wrong value " + strategy + " 
of partition.assignment.strategy in consumer config; " +
-"Valid values are 'range' and 'roundrobin'")
+"Valid values are 'range', 'roundrobin', and 'fair'")
 }
   }
 }
@@ -193,7 +194,7 @@ class ConsumerConfig private (val props: 
VerifiableProperties) extends ZKConfig(
   /** Whether messages from internal topics (such as offsets) should be 
exposed to the consumer. */
   val excludeInternalTopics = props.getBoolean("exclude.internal.topics", 
ExcludeInternalTopics)
 
-  /** Select a strategy for assigning partitions to consumer streams. Possible 
values: range, roundrobin */
+  /** Select a strategy for assigning partitions to consumer streams. Possible 
values: range, roundrobin, fair */
   val partitionAssignmentStrategy = 
props.getString("partition.assignment.strategy", 
DefaultPartitionAssignmentStrategy)
   
   validate(this)
diff --git a/core/src/main/scala/kafka/consumer/PartitionAssignor.scala 
b/core/src/main/scala/kafka/consumer/PartitionAssignor.scala
index f02df352acf..42b35f7ec6d 100755
--- a/core/src/main/scala/kafka/consumer/PartitionAssignor.scala
+++ b/core/src/main/scala/kafka/consumer/PartitionAssignor.scala
@@ -37,6 +37,7 @@ trait PartitionAssignor {
 object PartitionAssignor {
   def createInstance(assignmentStrategy: String) = assignmentStrategy match {
 case "roundrobin" => new RoundRobinAssignor()
+case "fair" => new FairAssignor()
 case _ => new RangeAssignor()
   }
 }
@@ -162,3 +163,76 @@ class RangeAssignor() extends PartitionAssignor with 
Logging {
 partitionAssignment
   }
 }
+
+/**
+ * The fair assignor attempts to balance partitions across consumers such that 
each consumer thread is assigned approximately
+ * the same number of partitions, even if the consumer topic subscriptions are 
substantially different (if they are identical,
+ * then the result will be equivalent to that of the roundrobin assignor). The 
running total of assignments per consumer
+ * thread is tracked as the algorithm executes in order to accomplish this.
+ *
+ * The algorithm starts with the topic with the fewest consumer subscriptions, 
and assigns its partitions in roundrobin
+ * fashion. In the event of a tie for least subscriptions, the topic with the 
highest partition count is assigned first, as
+ * this generally creates a more balanced distribution. The final tiebreaker 
is the topic name.
+ *
+ * The partitions for subsequent topics are assigned to the subscribing 
consumer with the fewest number of assignments.
+ * In the event of a tie for least assignments, the tiebreaker is the consumer 
id, so that the assignment pattern is fairly
+ * similar to how the roundrobin assignor functions.
+ *
+ * For example, suppose there are two consumers C0 and C1, two topics t0 and 
t1, and each topic has 3 partitions,
+ * resulting in partitions t0p0, t0p1, t0p2, t1p0, t1p1, and t1p2. If both C0 
and C1 are consuming t0, but only C1 is
+ * consuming t1 then the assignment will be:
+ * C0 -> [t0p0, t0p1, t0p2]
+ * C1 -> [t1p0, t1p1, t1p2]
+ */
+class FairAssignor() extends PartitionAssignor with Logging {
+
+  def assign(ctx: AssignmentContext) = {
+val valueFactory = (topic: String) => new 
mutable.HashMap[TopicAndPartition, ConsumerThreadId]
+val partitionAssignment =
+ 

[jira] [Commented] (KAFKA-2435) More optimally balanced partition assignment strategy

2018-02-08 Thread Andrew Olson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2435?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16357214#comment-16357214
 ] 

Andrew Olson commented on KAFKA-2435:
-

[~jeffwidman] yes, that sounds quite reasonable to me.

> More optimally balanced partition assignment strategy
> -
>
> Key: KAFKA-2435
> URL: https://issues.apache.org/jira/browse/KAFKA-2435
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Andrew Olson
>Assignee: Andrew Olson
>Priority: Major
> Fix For: 2.0.0
>
> Attachments: KAFKA-2435.patch
>
>
> While the roundrobin partition assignment strategy is an improvement over the 
> range strategy, when the consumer topic subscriptions are not identical 
> (previously disallowed but will be possible as of KAFKA-2172) it can produce 
> heavily skewed assignments. As suggested 
> [here|https://issues.apache.org/jira/browse/KAFKA-2172?focusedCommentId=14530767&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14530767]
>  it would be nice to have a strategy that attempts to assign an equal number 
> of partitions to each consumer in a group, regardless of how similar their 
> individual topic subscriptions are. We can accomplish this by tracking the 
> number of partitions assigned to each consumer, and having the partition 
> assignment loop assign each partition to a consumer interested in that topic 
> with the least number of partitions assigned. 
> Additionally, we can optimize the distribution fairness by adjusting the 
> partition assignment order:
> * Topics with fewer consumers are assigned first.
> * In the event of a tie for least consumers, the topic with more partitions 
> is assigned first.
> The general idea behind these two rules is to keep the most flexible 
> assignment choices available as long as possible by starting with the most 
> constrained partitions/consumers.
> This JIRA addresses the original high-level consumer. For the new consumer, 
> see KAFKA-3297.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-2435) More optimally balanced partition assignment strategy

2018-02-08 Thread Jeff Widman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2435?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16357203#comment-16357203
 ] 

Jeff Widman commented on KAFKA-2435:


Can this be closed as wontfix? 

Two reasons:

 

1) it targets the deprecated (removed?) high-level consumer

2) KIP-54 addressed some of the concerns here, and IMHO is a better solution 
because it addresses both fairness and affinity/stickyness

> More optimally balanced partition assignment strategy
> -
>
> Key: KAFKA-2435
> URL: https://issues.apache.org/jira/browse/KAFKA-2435
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Andrew Olson
>Assignee: Andrew Olson
>Priority: Major
> Fix For: 2.0.0
>
> Attachments: KAFKA-2435.patch
>
>
> While the roundrobin partition assignment strategy is an improvement over the 
> range strategy, when the consumer topic subscriptions are not identical 
> (previously disallowed but will be possible as of KAFKA-2172) it can produce 
> heavily skewed assignments. As suggested 
> [here|https://issues.apache.org/jira/browse/KAFKA-2172?focusedCommentId=14530767&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14530767]
>  it would be nice to have a strategy that attempts to assign an equal number 
> of partitions to each consumer in a group, regardless of how similar their 
> individual topic subscriptions are. We can accomplish this by tracking the 
> number of partitions assigned to each consumer, and having the partition 
> assignment loop assign each partition to a consumer interested in that topic 
> with the least number of partitions assigned. 
> Additionally, we can optimize the distribution fairness by adjusting the 
> partition assignment order:
> * Topics with fewer consumers are assigned first.
> * In the event of a tie for least consumers, the topic with more partitions 
> is assigned first.
> The general idea behind these two rules is to keep the most flexible 
> assignment choices available as long as possible by starting with the most 
> constrained partitions/consumers.
> This JIRA addresses the original high-level consumer. For the new consumer, 
> see KAFKA-3297.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)