[ 
https://issues.apache.org/jira/browse/KAFKA-7126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16558652#comment-16558652
 ] 

ASF GitHub Bot commented on KAFKA-7126:
---------------------------------------

lindong28 closed pull request #5408: KAFKA-7126: Reduce number of rebalance for 
large consumer group after a topic is created
URL: https://github.com/apache/kafka/pull/5408
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java 
b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
index 6c663cfac93..c8fa66b7e3b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
@@ -124,15 +124,27 @@ public synchronized void add(String topic) {
         }
     }
 
+    /**
+     * Return the next time when the current cluster info can be updated 
(i.e., backoff time has elapsed).
+     *
+     * @param nowMs current time in ms
+     * @return remaining time in ms till the cluster info can be updated again
+     */
+    public synchronized long timeToAllowUpdate(long nowMs) {
+        return Math.max(this.lastRefreshMs + this.refreshBackoffMs - nowMs, 0);
+    }
+
     /**
      * The next time to update the cluster info is the maximum of the time the 
current info will expire and the time the
      * current info can be updated (i.e. backoff time has elapsed); If an 
update has been request then the expiry time
      * is now
+     *
+     * @param nowMs current time in ms
+     * @return remaining time in ms till updating the cluster info
      */
-    public synchronized long timeToNextUpdate(long nowMs) {
+    public synchronized long  timeToNextUpdate(long nowMs) {
         long timeToExpire = needUpdate ? 0 : 
Math.max(this.lastSuccessfulRefreshMs + this.metadataExpireMs - nowMs, 0);
-        long timeToAllowUpdate = this.lastRefreshMs + this.refreshBackoffMs - 
nowMs;
-        return Math.max(timeToExpire, timeToAllowUpdate);
+        return Math.max(timeToExpire, timeToAllowUpdate(nowMs));
     }
 
     /**
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index f9b77e921cd..8f25d6e8eee 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -328,6 +328,16 @@ public boolean poll(final long timeoutMs) {
                 // we need to ensure that the metadata is fresh before joining 
initially. This ensures
                 // that we have matched the pattern against the cluster's 
topics at least once before joining.
                 if (subscriptions.hasPatternSubscription()) {
+                    // For consumer group that uses pattern-based 
subscription, after a topic is created,
+                    // any consumer that discovers the topic after metadata 
refresh can trigger rebalance
+                    // across the entire consumer group. Multiple rebalances 
can be triggered after one topic
+                    // creation if consumers refresh metadata at vastly 
different times. We can significantly
+                    // reduce the number of rebalances caused by single topic 
creation by asking consumer to
+                    // refresh metadata before re-joining the group as long as 
the refresh backoff time has
+                    // passed.
+                    if (this.metadata.timeToAllowUpdate(currentTime) == 0) {
+                        this.metadata.requestUpdate();
+                    }
                     if 
(!client.ensureFreshMetadata(remainingTimeAtLeastZero(timeoutMs, elapsed))) {
                         return false;
                     }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index ba392c6f4cb..cec56b07e00 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -513,6 +513,50 @@ public boolean matches(AbstractRequest body) {
         assertEquals(newAssignmentSet, rebalanceListener.assigned);
     }
 
+    @Test
+    public void 
testForceMetadataRefreshForPatternSubscriptionDuringRebalance() {
+        // Set up a non-leader consumer with pattern subscription and a 
cluster containing one topic matching the
+        // pattern.
+        final String consumerId = "consumer";
+
+        subscriptions.subscribe(Pattern.compile(".*"), rebalanceListener);
+        metadata.update(TestUtils.singletonCluster(topic1, 1), 
Collections.<String>emptySet(),
+            time.milliseconds());
+        assertEquals(singleton(topic1), subscriptions.subscription());
+
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(Long.MAX_VALUE);
+
+        // Instrument the test so that metadata will contain two topics after 
next refresh.
+        client.prepareMetadataUpdate(cluster, Collections.emptySet());
+
+        client.prepareResponse(joinGroupFollowerResponse(1, consumerId, 
"leader", Errors.NONE));
+        client.prepareResponse(new MockClient.RequestMatcher() {
+            @Override
+            public boolean matches(AbstractRequest body) {
+                SyncGroupRequest sync = (SyncGroupRequest) body;
+                return sync.memberId().equals(consumerId) &&
+                    sync.generationId() == 1 &&
+                    sync.groupAssignment().isEmpty();
+            }
+        }, syncGroupResponse(singletonList(t1p), Errors.NONE));
+
+        partitionAssignor.prepare(singletonMap(consumerId, 
singletonList(t1p)));
+
+        // This will trigger rebalance.
+        coordinator.poll(Long.MAX_VALUE);
+
+        // Make sure that the metadata was refreshed during the rebalance and 
thus subscriptions now contain two topics.
+        final Set<String> updatedSubscriptionSet = new 
HashSet<>(Arrays.asList(topic1, topic2));
+        assertEquals(updatedSubscriptionSet, subscriptions.subscription());
+
+        // Refresh the metadata again. Since there have been no changes since 
the last refresh, it won't trigger
+        // rebalance again.
+        metadata.requestUpdate();
+        client.poll(Long.MAX_VALUE, time.milliseconds());
+        assertFalse(coordinator.rejoinNeededOrPending());
+    }
+
     @Test
     public void testWakeupDuringJoin() {
         final String consumerId = "leader";


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Reduce number of rebalance period for large consumer groups after a topic is 
> created
> ------------------------------------------------------------------------------------
>
>                 Key: KAFKA-7126
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7126
>             Project: Kafka
>          Issue Type: Improvement
>            Reporter: Dong Lin
>            Assignee: Jon Lee
>            Priority: Major
>             Fix For: 2.0.0, 2.1.0
>
>         Attachments: 1.diff
>
>
> For a group of 200 MirrorMaker consumers with patten-based topic 
> subscription, a single topic creation caused 50 rebalances for each of these 
> consumer over 5 minutes period. This causes the MM to significantly lag 
> behind during this 5 minutes period and the clusters may be considerably 
> out-of-sync during this period.
> Ideally we would like to trigger only 1 rebalance in the MM group after a 
> topic is created. And conceptually it should be doable.
>  
> Here is the explanation of this repeated consumer rebalance based on the 
> consumer rebalance logic in the latest Kafka code:
> 1) A topic of 10 partitions are created in the cluster and it matches the 
> subscription pattern of the MM consumers.
> 2) The leader of the MM consumer group detects the new topic after metadata 
> refresh. It triggers rebalance.
> 3) At time T0, the first rebalance finishes. 10 consumers are assigned 1 
> partition of this topic. The other 190 consumers are not assigned any 
> partition of this topic. At this moment, the newly created topic will appear 
> in `ConsumerCoordinator.subscriptions.subscription` for those consumers who 
> is assigned partition of this consumer or who has refreshed metadata before 
> time T0.
> 4) In the common case, half of the consumers has refreshed metadata before 
> the leader of the consumer group refreshed metadata. Thus around 100 + 10 = 
> 110 consumers has the newly created topic in 
> `ConsumerCoordinator.subscriptions.subscription`. The other 90 consumers do 
> not have this topic in `ConsumerCoordinator.subscriptions.subscription`.
> 5) For those 90 consumers, if any consumer refreshes metadata, it will add 
> this topic to `ConsumerCoordinator.subscriptions.subscription`, which causes 
> `ConsumerCoordinator.rejoinNeededOrPending()` to return true and triggers 
> another rebalance. If a few consumers refresh metadata almost at the same 
> time, they will jointly trigger one rebalance. Otherwise, they each trigger a 
> separate rebalance.
> 6) The default metadata.max.age.ms is 5 minutes. Thus in the worse case, 
> which is probably also the average case if number of consumers in the group 
> is large, the latest consumer will refresh its metadata 5 minutes after T0. 
> And the rebalance will be repeated during this 5 minutes interval.
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to