This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch 3.3 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.3 by this push: new 7131724819 KAFKA-14201; Consumer should not send group instance ID if committing with empty member ID (#12599) 7131724819 is described below commit 7131724819d35ee08ff84a4cb9b8ca88bacb1311 Author: David Jacot <dja...@confluent.io> AuthorDate: Fri Sep 9 00:05:40 2022 +0200 KAFKA-14201; Consumer should not send group instance ID if committing with empty member ID (#12599) The consumer group instance ID is used to support a notion of "static" consumer groups. The idea is to be able to identify the same group instance across restarts so that a rebalance is not needed. However, if the user sets `group.instance.id` in the consumer configuration, but uses "simple" assignment with `assign()`, then the instance ID nevertheless is sent in the OffsetCommit request to the coordinator. This may result in a surprising UNKNOWN_MEMBER_ID error. This PR fixes the issue on the client side by not setting the group instance id if the member id is empty (no generation). Reviewers: Jason Gustafson <ja...@confluent.io> --- .../consumer/internals/ConsumerCoordinator.java | 5 ++++- .../internals/ConsumerCoordinatorTest.java | 26 ++++++++++++++++++++++ 2 files changed, 30 insertions(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index 9838e7dc8f..5228c60e0f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -1272,8 +1272,10 @@ public final class ConsumerCoordinator extends AbstractCoordinator { } final Generation generation; + final String groupInstanceId; if (subscriptions.hasAutoAssignedPartitions()) { generation = generationIfStable(); + groupInstanceId = rebalanceConfig.groupInstanceId.orElse(null); // if the generation is null, we are not part of an active group (and we expect to be). // the only thing we can do is fail the commit and let the user rejoin the group in poll(). if (generation == null) { @@ -1293,6 +1295,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator { } } else { generation = Generation.NO_GENERATION; + groupInstanceId = null; } OffsetCommitRequest.Builder builder = new OffsetCommitRequest.Builder( @@ -1300,7 +1303,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator { .setGroupId(this.rebalanceConfig.groupId) .setGenerationId(generation.generationId) .setMemberId(generation.memberId) - .setGroupInstanceId(rebalanceConfig.groupInstanceId.orElse(null)) + .setGroupInstanceId(groupInstanceId) .setTopics(new ArrayList<>(requestTopicDataMap.values())) ); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java index d948990d69..5e080b7721 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java @@ -2821,6 +2821,32 @@ public abstract class ConsumerCoordinatorTest { assertEquals(newGen, coordinator.generation()); } + @Test + public void testCommitOffsetShouldNotSetInstanceIdIfMemberIdIsUnknown() { + rebalanceConfig = buildRebalanceConfig(groupInstanceId); + ConsumerCoordinator coordinator = buildCoordinator( + rebalanceConfig, + new Metrics(), + assignors, + false, + subscriptions + ); + + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); + coordinator.ensureCoordinatorReady(time.timer(5000)); + + client.prepareResponse(body -> { + OffsetCommitRequestData data = ((OffsetCommitRequest) body).data(); + return data.groupInstanceId() == null && data.memberId().isEmpty(); + }, offsetCommitResponse(Collections.emptyMap())); + + RequestFuture<Void> future = coordinator.sendOffsetCommitRequest(singletonMap(t1p, + new OffsetAndMetadata(100L, "metadata"))); + + assertTrue(consumerClient.poll(future, time.timer(5000))); + assertFalse(future.failed()); + } + @Test public void testCommitOffsetRebalanceInProgress() { // we cannot retry if a rebalance occurs before the commit completed