This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.3 by this push:
     new 7131724819 KAFKA-14201; Consumer should not send group instance ID if 
committing with empty member ID (#12599)
7131724819 is described below

commit 7131724819d35ee08ff84a4cb9b8ca88bacb1311
Author: David Jacot <dja...@confluent.io>
AuthorDate: Fri Sep 9 00:05:40 2022 +0200

    KAFKA-14201; Consumer should not send group instance ID if committing with 
empty member ID (#12599)
    
    The consumer group instance ID is used to support a notion of "static" 
consumer groups. The idea is to be able to identify the same group instance 
across restarts so that a rebalance is not needed. However, if the user sets 
`group.instance.id` in the consumer configuration, but uses "simple" assignment 
with `assign()`, then the instance ID nevertheless is sent in the OffsetCommit 
request to the coordinator. This may result in a surprising UNKNOWN_MEMBER_ID 
error.
    
    This PR fixes the issue on the client side by not setting the group 
instance id if the member id is empty (no generation).
    
    Reviewers: Jason Gustafson <ja...@confluent.io>
---
 .../consumer/internals/ConsumerCoordinator.java    |  5 ++++-
 .../internals/ConsumerCoordinatorTest.java         | 26 ++++++++++++++++++++++
 2 files changed, 30 insertions(+), 1 deletion(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 9838e7dc8f..5228c60e0f 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -1272,8 +1272,10 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
         }
 
         final Generation generation;
+        final String groupInstanceId;
         if (subscriptions.hasAutoAssignedPartitions()) {
             generation = generationIfStable();
+            groupInstanceId = rebalanceConfig.groupInstanceId.orElse(null);
             // if the generation is null, we are not part of an active group 
(and we expect to be).
             // the only thing we can do is fail the commit and let the user 
rejoin the group in poll().
             if (generation == null) {
@@ -1293,6 +1295,7 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
             }
         } else {
             generation = Generation.NO_GENERATION;
+            groupInstanceId = null;
         }
 
         OffsetCommitRequest.Builder builder = new OffsetCommitRequest.Builder(
@@ -1300,7 +1303,7 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
                         .setGroupId(this.rebalanceConfig.groupId)
                         .setGenerationId(generation.generationId)
                         .setMemberId(generation.memberId)
-                        
.setGroupInstanceId(rebalanceConfig.groupInstanceId.orElse(null))
+                        .setGroupInstanceId(groupInstanceId)
                         .setTopics(new 
ArrayList<>(requestTopicDataMap.values()))
         );
 
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index d948990d69..5e080b7721 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -2821,6 +2821,32 @@ public abstract class ConsumerCoordinatorTest {
         assertEquals(newGen, coordinator.generation());
     }
 
+    @Test
+    public void testCommitOffsetShouldNotSetInstanceIdIfMemberIdIsUnknown() {
+        rebalanceConfig = buildRebalanceConfig(groupInstanceId);
+        ConsumerCoordinator coordinator = buildCoordinator(
+            rebalanceConfig,
+            new Metrics(),
+            assignors,
+            false,
+            subscriptions
+        );
+
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(time.timer(5000));
+
+        client.prepareResponse(body -> {
+            OffsetCommitRequestData data = ((OffsetCommitRequest) body).data();
+            return data.groupInstanceId() == null && data.memberId().isEmpty();
+        }, offsetCommitResponse(Collections.emptyMap()));
+
+        RequestFuture<Void> future = 
coordinator.sendOffsetCommitRequest(singletonMap(t1p,
+            new OffsetAndMetadata(100L, "metadata")));
+
+        assertTrue(consumerClient.poll(future, time.timer(5000)));
+        assertFalse(future.failed());
+    }
+
     @Test
     public void testCommitOffsetRebalanceInProgress() {
         // we cannot retry if a rebalance occurs before the commit completed

Reply via email to