dajac commented on code in PR #13240:
URL: https://github.com/apache/kafka/pull/13240#discussion_r1168675481


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -1283,10 +1289,16 @@ RequestFuture<Void> sendOffsetCommitRequest(final 
Map<TopicPartition, OffsetAndM
                 return RequestFuture.failure(new 
IllegalArgumentException("Invalid offset: " + offsetAndMetadata.offset()));
             }
 
+            Uuid topicId = 
topicIdAndNames.getTopicIdOrZero(topicPartition.topic());

Review Comment:
   nit: `topicIdOrZero`?



##########
clients/src/main/java/org/apache/kafka/common/TopicIdAndNameBiMap.java:
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common;
+
+import org.apache.kafka.common.errors.InvalidTopicException;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * Encapsulates the mapping between topic names and ids assuming a 1:1 
relationship between
+ * a name and an id.
+ * <p></p>
+ * Note that this class intends to be used for the (reverse) lookup of topic 
IDs/names, but
+ * not to characterize the set of topics which are known by a client. Use the
+ * {@link org.apache.kafka.clients.MetadataCache} for that purpose.
+ */
+public class TopicIdAndNameBiMap {
+    private final Map<String, Uuid> topicIds;
+    private final Map<Uuid, String> topicNames;
+
+    /**
+     * A mapping which universe of topic ids and names is captured from the 
input map. The reverse association
+     * between a topic ID and a topic name is computed by this method. If 
there are more than one topic name
+     * resolving to the same topic ID, an {@link InvalidTopicException} is 
thrown.
+     */
+    public static TopicIdAndNameBiMap fromTopicIds(Map<String, Uuid> topicIds) 
{
+        Map<Uuid, String> topicNames = new HashMap<>(topicIds.size());
+
+        for (Map.Entry<String, Uuid> e: topicIds.entrySet()) {
+            String conflicting = topicNames.putIfAbsent(e.getValue(), 
e.getKey());
+            if (conflicting != null) {
+                throw new IllegalStateException(
+                        "Topic " + e.getKey() + " shares the same ID " + 
e.getValue() + " as topic " + conflicting);
+            }
+        }
+
+        return new TopicIdAndNameBiMap(topicIds, topicNames);
+    }
+
+    /**
+     * A mapping which acts as a wrapper around the input mapping of topic ids 
from/to topic names.
+     * No validation is performed about the consistency of the mapping. This 
method is to be preferred
+     * when the copy of the input maps needs to be avoided.
+     */
+    public static TopicIdAndNameBiMap wrap(Map<String, Uuid> topicIds, 
Map<Uuid, String> topicNames) {
+        return new TopicIdAndNameBiMap(topicIds, topicNames);
+    }
+
+    /**
+     * Used when no mapping between topic name and id exists.
+     */
+    public static TopicIdAndNameBiMap emptyMapping() {
+        return fromTopicIds(Collections.emptyMap());
+    }
+
+    private TopicIdAndNameBiMap(Map<String, Uuid> topicIds, Map<Uuid, String> 
topicNames) {
+        this.topicIds = Collections.unmodifiableMap(topicIds);
+        this.topicNames = Collections.unmodifiableMap(topicNames);
+    }
+
+    /**
+     * Returns the ID of the topic with the given name, if that association 
exists.
+     */
+    public Uuid getTopicIdOrZero(String name) {
+        return Optional.ofNullable(topicIds.get(name)).orElse(Uuid.ZERO_UUID);

Review Comment:
   nit: Should we replace `ofNullable` by a simple `if/else` statement? 
Allocating an optional does not seem necessary here.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##########
@@ -2707,11 +2742,271 @@ public void testCommitOffsetUnknownMemberId() {
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
 
-        prepareOffsetCommitRequest(singletonMap(t1p, 100L), 
Errors.UNKNOWN_MEMBER_ID);
+        prepareOffsetCommitRequest(singletonMap(ti1p, 100L), 
Errors.UNKNOWN_MEMBER_ID);
         assertThrows(CommitFailedException.class, () -> 
coordinator.commitOffsetsSync(singletonMap(t1p,
                 new OffsetAndMetadata(100L, "metadata")), 
time.timer(Long.MAX_VALUE)));
     }
 
+    @ParameterizedTest
+    @ValueSource(booleans = { true, false })
+    public void testCommitOffsetUnknownTopicId(boolean commitSync) {
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+        // Prepare five OffsetCommit requests which return a retriable 
UNKNOWN_TOPIC_ID error.
+        // Set the timeout accordingly so that commitOffsetsSync completes on 
the fifth attempt.
+        // Note that the timer (MockTime) only ticks when its sleep(long) 
method is invoked.
+        // Because the ConsumerNetworkClient does not call sleep() in its 
network-level poll(.), this
+        // timer never moves forward once the network client is invoked. If 
there is no available
+        // response to consume, its internal poll loop never completes. Hence, 
the timeout needs to be
+        // enforced in the ConsumerCoordinator, and we need to make sure there 
are enough responses
+        // queued in the MockClient to satisfy all invocations of the 
ConsumerNetworkClient#poll(.).
+        int offsetCommitCalls = 5;
+        long timeoutMs = rebalanceConfig.retryBackoffMs * offsetCommitCalls;
+
+        IntStream.range(0, offsetCommitCalls).forEach(__ ->
+            prepareOffsetCommitRequest(singletonMap(ti1p, 100L), 
Errors.UNKNOWN_TOPIC_ID));
+
+        // UnknownTopicIdException is retriable, hence will be retried by the 
coordinator as long as
+        // the timeout allows. Note that since topic ids are not part of the 
public API of the consumer,
+        // we cannot throw an UnknownTopicId to the user. By design, a false 
boolean indicating the
+        // offset commit failed is returned.
+
+        Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap(
+            t1p,
+            new OffsetAndMetadata(100L, "metadata")
+        );
+
+        if (commitSync) {
+            assertFalse(coordinator.commitOffsetsSync(offsets, 
time.timer(timeoutMs)));
+
+        } else {
+            AtomicBoolean callbackInvoked = new AtomicBoolean();
+            coordinator.commitOffsetsAsync(offsets, (inputOffsets, exception) 
-> {
+                assertSame(inputOffsets, offsets);
+                assertEquals(RetriableCommitFailedException.class, 
exception.getClass());
+                assertEquals(UnknownTopicOrPartitionException.class, 
exception.getCause().getClass());
+                callbackInvoked.set(true);
+            });
+
+            coordinator.invokeCompletedOffsetCommitCallbacks();
+            assertTrue(callbackInvoked.get());
+        }
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = { true, false })
+    public void testRetryCommitUnknownTopicId(boolean commitSync) {
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+        client.prepareResponse(offsetCommitResponse(singletonMap(ti1p, 
Errors.UNKNOWN_TOPIC_ID)));
+        client.prepareResponse(offsetCommitResponse(singletonMap(ti1p, 
Errors.NONE)));
+
+        Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap(
+            t1p,
+            new OffsetAndMetadata(100L, "metadata")
+        );
+
+        if (commitSync) {
+            assertTrue(coordinator.commitOffsetsSync(offsets, 
time.timer(Long.MAX_VALUE)));
+

Review Comment:
   nit: This empty line could be removed.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##########
@@ -3981,23 +4348,13 @@ private void prepareJoinAndSyncResponse(String 
consumerId, int generation, List<
         }, syncGroupResponse(assignment, Errors.NONE));
     }
 
-    private Map<TopicPartition, Errors> 
partitionErrors(Collection<TopicPartition> partitions, Errors error) {
-        final Map<TopicPartition, Errors> errors = new HashMap<>();
-        for (TopicPartition partition : partitions) {
-            errors.put(partition, error);
-        }
-        return errors;
-    }
-
-    private void respondToOffsetCommitRequest(final Map<TopicPartition, Long> 
expectedOffsets, Errors error) {
-        Map<TopicPartition, Errors> errors = 
partitionErrors(expectedOffsets.keySet(), error);
-        client.respond(offsetCommitRequestMatcher(expectedOffsets), 
offsetCommitResponse(errors));
-    }
-
-    private MockClient.RequestMatcher offsetCommitRequestMatcher(final 
Map<TopicPartition, Long> expectedOffsets) {
+    private MockClient.RequestMatcher offsetCommitRequestMatcher(
+        Map<TopicPartition, Long> expectedOffsets,
+        Consumer<OffsetCommitRequest> requestConsumer

Review Comment:
   I am not sure to follow why we need this `Consumer` here. Couldn't we just 
have a matcher which verifies what we want/need?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -1331,21 +1343,28 @@ RequestFuture<Void> sendOffsetCommitRequest(final 
Map<TopicPartition, OffsetAndM
                         .setGenerationId(generation.generationId)
                         .setMemberId(generation.memberId)
                         .setGroupInstanceId(groupInstanceId)
-                        .setTopics(new 
ArrayList<>(requestTopicDataMap.values()))
+                        .setTopics(new 
ArrayList<>(requestTopicDataMap.values())),
+                canUseTopicIds
         );
 
         log.trace("Sending OffsetCommit request with {} to coordinator {}", 
offsets, coordinator);
 
         return client.send(coordinator, builder)
-                .compose(new OffsetCommitResponseHandler(offsets, generation));
+                .compose(new OffsetCommitResponseHandler(offsets, generation, 
topicIdAndNames));
     }
 
     private class OffsetCommitResponseHandler extends 
CoordinatorResponseHandler<OffsetCommitResponse, Void> {
         private final Map<TopicPartition, OffsetAndMetadata> offsets;
+        private final TopicIdAndNameBiMap topicIdAndNames;
 
-        private OffsetCommitResponseHandler(Map<TopicPartition, 
OffsetAndMetadata> offsets, Generation generation) {
+        private OffsetCommitResponseHandler(
+                Map<TopicPartition, OffsetAndMetadata> offsets,
+                Generation generation,
+                TopicIdAndNameBiMap topicIdAndNames

Review Comment:
   nit: Indentation seems to be off here.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -1354,8 +1373,21 @@ public void handle(OffsetCommitResponse commitResponse, 
RequestFuture<Void> futu
             Set<String> unauthorizedTopics = new HashSet<>();
 
             for (OffsetCommitResponseData.OffsetCommitResponseTopic topic : 
commitResponse.data().topics()) {
+                String topicName = topic.name();
+
+                if (this.response.requestHeader().apiVersion() >= 9) {
+                    topicName = 
topicIdAndNames.getTopicName(topic.topicId()).orElse(null);

Review Comment:
   nit: `topicNameOrNull` and get rid of the `Optional`?



##########
clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java:
##########
@@ -46,6 +49,8 @@
  *   - {@link Errors#INVALID_COMMIT_OFFSET_SIZE}
  *   - {@link Errors#TOPIC_AUTHORIZATION_FAILED}
  *   - {@link Errors#GROUP_AUTHORIZATION_FAILED}
+ *   - {@link Errors#UNKNOWN_MEMBER_ID}

Review Comment:
   nit: This one is already in the list (L47).



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##########
@@ -129,16 +150,19 @@
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 
 public abstract class ConsumerCoordinatorTest {
-    private final String topic1 = "test1";
-    private final String topic2 = "test2";
-    private final TopicPartition t1p = new TopicPartition(topic1, 0);
-    private final TopicPartition t2p = new TopicPartition(topic2, 0);
-    private final String groupId = "test-group";
+    private static String topic1 = "test1";
+    private static String topic2 = "test2";
+    private static TopicPartition t1p = new TopicPartition(topic1, 0);
+    private static TopicIdPartition ti1p = new 
TopicIdPartition(Uuid.randomUuid(), t1p);

Review Comment:
   nit: Could we try to combine those? `private static TopicIdPartition t1p = 
new new TopicIdPartition(Uuid.randomUuid(), 0, topic1)`?



##########
clients/src/main/java/org/apache/kafka/common/TopicIdAndNameBiMap.java:
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common;
+
+import org.apache.kafka.common.errors.InvalidTopicException;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * Encapsulates the mapping between topic names and ids assuming a 1:1 
relationship between
+ * a name and an id.
+ * <p></p>

Review Comment:
   nit: Should we remove `<p></p>`?



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##########
@@ -159,13 +183,22 @@ public abstract class ConsumerCoordinatorTest {
     private final String consumerId = "consumer";
     private final String consumerId2 = "consumer2";
 
+    private final Map<String, Uuid> topicIds = new HashMap<>();
+    {
+        topicIds.put(topic1, ti1p.topicId());
+        topicIds.put(topic2, ti2p.topicId());
+    }
+
+    private final Map<String, Integer> partitionCounts = new HashMap<>();
+    {
+        partitionCounts.put(topic1, 1);
+        partitionCounts.put(topic2, 1);
+    }
+
+    private final TopicIdAndNameBiMap topicIdAndNames = 
TopicIdAndNameBiMap.fromTopicIds(topicIds);

Review Comment:
   nit: `topicIdAndNameBiMapping`?



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##########
@@ -2707,11 +2742,271 @@ public void testCommitOffsetUnknownMemberId() {
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
 
-        prepareOffsetCommitRequest(singletonMap(t1p, 100L), 
Errors.UNKNOWN_MEMBER_ID);
+        prepareOffsetCommitRequest(singletonMap(ti1p, 100L), 
Errors.UNKNOWN_MEMBER_ID);
         assertThrows(CommitFailedException.class, () -> 
coordinator.commitOffsetsSync(singletonMap(t1p,
                 new OffsetAndMetadata(100L, "metadata")), 
time.timer(Long.MAX_VALUE)));
     }
 
+    @ParameterizedTest
+    @ValueSource(booleans = { true, false })
+    public void testCommitOffsetUnknownTopicId(boolean commitSync) {
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+        // Prepare five OffsetCommit requests which return a retriable 
UNKNOWN_TOPIC_ID error.
+        // Set the timeout accordingly so that commitOffsetsSync completes on 
the fifth attempt.
+        // Note that the timer (MockTime) only ticks when its sleep(long) 
method is invoked.
+        // Because the ConsumerNetworkClient does not call sleep() in its 
network-level poll(.), this
+        // timer never moves forward once the network client is invoked. If 
there is no available
+        // response to consume, its internal poll loop never completes. Hence, 
the timeout needs to be
+        // enforced in the ConsumerCoordinator, and we need to make sure there 
are enough responses
+        // queued in the MockClient to satisfy all invocations of the 
ConsumerNetworkClient#poll(.).
+        int offsetCommitCalls = 5;
+        long timeoutMs = rebalanceConfig.retryBackoffMs * offsetCommitCalls;
+
+        IntStream.range(0, offsetCommitCalls).forEach(__ ->
+            prepareOffsetCommitRequest(singletonMap(ti1p, 100L), 
Errors.UNKNOWN_TOPIC_ID));
+
+        // UnknownTopicIdException is retriable, hence will be retried by the 
coordinator as long as
+        // the timeout allows. Note that since topic ids are not part of the 
public API of the consumer,
+        // we cannot throw an UnknownTopicId to the user. By design, a false 
boolean indicating the
+        // offset commit failed is returned.
+
+        Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap(
+            t1p,
+            new OffsetAndMetadata(100L, "metadata")
+        );
+
+        if (commitSync) {
+            assertFalse(coordinator.commitOffsetsSync(offsets, 
time.timer(timeoutMs)));
+

Review Comment:
   nit: This empty line could be removed.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##########
@@ -2707,11 +2742,271 @@ public void testCommitOffsetUnknownMemberId() {
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
 
-        prepareOffsetCommitRequest(singletonMap(t1p, 100L), 
Errors.UNKNOWN_MEMBER_ID);
+        prepareOffsetCommitRequest(singletonMap(ti1p, 100L), 
Errors.UNKNOWN_MEMBER_ID);
         assertThrows(CommitFailedException.class, () -> 
coordinator.commitOffsetsSync(singletonMap(t1p,
                 new OffsetAndMetadata(100L, "metadata")), 
time.timer(Long.MAX_VALUE)));
     }
 
+    @ParameterizedTest
+    @ValueSource(booleans = { true, false })
+    public void testCommitOffsetUnknownTopicId(boolean commitSync) {
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+        // Prepare five OffsetCommit requests which return a retriable 
UNKNOWN_TOPIC_ID error.
+        // Set the timeout accordingly so that commitOffsetsSync completes on 
the fifth attempt.
+        // Note that the timer (MockTime) only ticks when its sleep(long) 
method is invoked.
+        // Because the ConsumerNetworkClient does not call sleep() in its 
network-level poll(.), this
+        // timer never moves forward once the network client is invoked. If 
there is no available
+        // response to consume, its internal poll loop never completes. Hence, 
the timeout needs to be
+        // enforced in the ConsumerCoordinator, and we need to make sure there 
are enough responses
+        // queued in the MockClient to satisfy all invocations of the 
ConsumerNetworkClient#poll(.).
+        int offsetCommitCalls = 5;
+        long timeoutMs = rebalanceConfig.retryBackoffMs * offsetCommitCalls;
+
+        IntStream.range(0, offsetCommitCalls).forEach(__ ->
+            prepareOffsetCommitRequest(singletonMap(ti1p, 100L), 
Errors.UNKNOWN_TOPIC_ID));
+
+        // UnknownTopicIdException is retriable, hence will be retried by the 
coordinator as long as
+        // the timeout allows. Note that since topic ids are not part of the 
public API of the consumer,
+        // we cannot throw an UnknownTopicId to the user. By design, a false 
boolean indicating the
+        // offset commit failed is returned.
+
+        Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap(
+            t1p,
+            new OffsetAndMetadata(100L, "metadata")
+        );
+
+        if (commitSync) {
+            assertFalse(coordinator.commitOffsetsSync(offsets, 
time.timer(timeoutMs)));
+
+        } else {
+            AtomicBoolean callbackInvoked = new AtomicBoolean();
+            coordinator.commitOffsetsAsync(offsets, (inputOffsets, exception) 
-> {
+                assertSame(inputOffsets, offsets);
+                assertEquals(RetriableCommitFailedException.class, 
exception.getClass());
+                assertEquals(UnknownTopicOrPartitionException.class, 
exception.getCause().getClass());
+                callbackInvoked.set(true);
+            });
+
+            coordinator.invokeCompletedOffsetCommitCallbacks();
+            assertTrue(callbackInvoked.get());
+        }
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = { true, false })
+    public void testRetryCommitUnknownTopicId(boolean commitSync) {
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+        client.prepareResponse(offsetCommitResponse(singletonMap(ti1p, 
Errors.UNKNOWN_TOPIC_ID)));
+        client.prepareResponse(offsetCommitResponse(singletonMap(ti1p, 
Errors.NONE)));
+
+        Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap(
+            t1p,
+            new OffsetAndMetadata(100L, "metadata")
+        );
+
+        if (commitSync) {
+            assertTrue(coordinator.commitOffsetsSync(offsets, 
time.timer(Long.MAX_VALUE)));
+
+        } else {
+            AtomicBoolean callbackInvoked = new AtomicBoolean();
+            coordinator.commitOffsetsAsync(offsets, (inputOffsets, exception) 
-> {
+                // Unlike the commit offset sync API, the async API does not 
retry.

Review Comment:
   If the outcome of the test is different in this case, isn't it a bit weird 
to combine them in the same unit test?



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##########
@@ -2707,11 +2742,271 @@ public void testCommitOffsetUnknownMemberId() {
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
 
-        prepareOffsetCommitRequest(singletonMap(t1p, 100L), 
Errors.UNKNOWN_MEMBER_ID);
+        prepareOffsetCommitRequest(singletonMap(ti1p, 100L), 
Errors.UNKNOWN_MEMBER_ID);
         assertThrows(CommitFailedException.class, () -> 
coordinator.commitOffsetsSync(singletonMap(t1p,
                 new OffsetAndMetadata(100L, "metadata")), 
time.timer(Long.MAX_VALUE)));
     }
 
+    @ParameterizedTest
+    @ValueSource(booleans = { true, false })
+    public void testCommitOffsetUnknownTopicId(boolean commitSync) {
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+        // Prepare five OffsetCommit requests which return a retriable 
UNKNOWN_TOPIC_ID error.
+        // Set the timeout accordingly so that commitOffsetsSync completes on 
the fifth attempt.
+        // Note that the timer (MockTime) only ticks when its sleep(long) 
method is invoked.
+        // Because the ConsumerNetworkClient does not call sleep() in its 
network-level poll(.), this
+        // timer never moves forward once the network client is invoked. If 
there is no available
+        // response to consume, its internal poll loop never completes. Hence, 
the timeout needs to be
+        // enforced in the ConsumerCoordinator, and we need to make sure there 
are enough responses
+        // queued in the MockClient to satisfy all invocations of the 
ConsumerNetworkClient#poll(.).
+        int offsetCommitCalls = 5;
+        long timeoutMs = rebalanceConfig.retryBackoffMs * offsetCommitCalls;
+
+        IntStream.range(0, offsetCommitCalls).forEach(__ ->
+            prepareOffsetCommitRequest(singletonMap(ti1p, 100L), 
Errors.UNKNOWN_TOPIC_ID));
+
+        // UnknownTopicIdException is retriable, hence will be retried by the 
coordinator as long as
+        // the timeout allows. Note that since topic ids are not part of the 
public API of the consumer,
+        // we cannot throw an UnknownTopicId to the user. By design, a false 
boolean indicating the
+        // offset commit failed is returned.
+
+        Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap(
+            t1p,
+            new OffsetAndMetadata(100L, "metadata")
+        );
+
+        if (commitSync) {
+            assertFalse(coordinator.commitOffsetsSync(offsets, 
time.timer(timeoutMs)));
+
+        } else {
+            AtomicBoolean callbackInvoked = new AtomicBoolean();
+            coordinator.commitOffsetsAsync(offsets, (inputOffsets, exception) 
-> {
+                assertSame(inputOffsets, offsets);
+                assertEquals(RetriableCommitFailedException.class, 
exception.getClass());
+                assertEquals(UnknownTopicOrPartitionException.class, 
exception.getCause().getClass());
+                callbackInvoked.set(true);
+            });
+
+            coordinator.invokeCompletedOffsetCommitCallbacks();
+            assertTrue(callbackInvoked.get());
+        }
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = { true, false })
+    public void testRetryCommitUnknownTopicId(boolean commitSync) {
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+        client.prepareResponse(offsetCommitResponse(singletonMap(ti1p, 
Errors.UNKNOWN_TOPIC_ID)));
+        client.prepareResponse(offsetCommitResponse(singletonMap(ti1p, 
Errors.NONE)));
+
+        Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap(
+            t1p,
+            new OffsetAndMetadata(100L, "metadata")
+        );
+
+        if (commitSync) {
+            assertTrue(coordinator.commitOffsetsSync(offsets, 
time.timer(Long.MAX_VALUE)));
+
+        } else {
+            AtomicBoolean callbackInvoked = new AtomicBoolean();
+            coordinator.commitOffsetsAsync(offsets, (inputOffsets, exception) 
-> {
+                // Unlike the commit offset sync API, the async API does not 
retry.
+                assertSame(inputOffsets, offsets);
+                assertEquals(RetriableCommitFailedException.class, 
exception.getClass());
+                assertEquals(UnknownTopicOrPartitionException.class, 
exception.getCause().getClass());
+                callbackInvoked.set(true);
+            });
+
+            coordinator.invokeCompletedOffsetCommitCallbacks();
+            assertTrue(callbackInvoked.get());
+        }
+    }
+
+    static Stream<Arguments> commitOffsetTestArgs() {

Review Comment:
   This goes a bit too far in my opinion. We usually prefer to have simpler 
parameterized tests. Could we simplify this somehow and bring stuck back in the 
main unit test?



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##########
@@ -2707,11 +2742,271 @@ public void testCommitOffsetUnknownMemberId() {
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
 
-        prepareOffsetCommitRequest(singletonMap(t1p, 100L), 
Errors.UNKNOWN_MEMBER_ID);
+        prepareOffsetCommitRequest(singletonMap(ti1p, 100L), 
Errors.UNKNOWN_MEMBER_ID);
         assertThrows(CommitFailedException.class, () -> 
coordinator.commitOffsetsSync(singletonMap(t1p,
                 new OffsetAndMetadata(100L, "metadata")), 
time.timer(Long.MAX_VALUE)));
     }
 
+    @ParameterizedTest
+    @ValueSource(booleans = { true, false })
+    public void testCommitOffsetUnknownTopicId(boolean commitSync) {
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+        // Prepare five OffsetCommit requests which return a retriable 
UNKNOWN_TOPIC_ID error.
+        // Set the timeout accordingly so that commitOffsetsSync completes on 
the fifth attempt.
+        // Note that the timer (MockTime) only ticks when its sleep(long) 
method is invoked.
+        // Because the ConsumerNetworkClient does not call sleep() in its 
network-level poll(.), this
+        // timer never moves forward once the network client is invoked. If 
there is no available
+        // response to consume, its internal poll loop never completes. Hence, 
the timeout needs to be
+        // enforced in the ConsumerCoordinator, and we need to make sure there 
are enough responses
+        // queued in the MockClient to satisfy all invocations of the 
ConsumerNetworkClient#poll(.).
+        int offsetCommitCalls = 5;
+        long timeoutMs = rebalanceConfig.retryBackoffMs * offsetCommitCalls;
+
+        IntStream.range(0, offsetCommitCalls).forEach(__ ->
+            prepareOffsetCommitRequest(singletonMap(ti1p, 100L), 
Errors.UNKNOWN_TOPIC_ID));
+
+        // UnknownTopicIdException is retriable, hence will be retried by the 
coordinator as long as
+        // the timeout allows. Note that since topic ids are not part of the 
public API of the consumer,
+        // we cannot throw an UnknownTopicId to the user. By design, a false 
boolean indicating the
+        // offset commit failed is returned.
+
+        Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap(
+            t1p,
+            new OffsetAndMetadata(100L, "metadata")
+        );
+
+        if (commitSync) {
+            assertFalse(coordinator.commitOffsetsSync(offsets, 
time.timer(timeoutMs)));
+
+        } else {
+            AtomicBoolean callbackInvoked = new AtomicBoolean();
+            coordinator.commitOffsetsAsync(offsets, (inputOffsets, exception) 
-> {
+                assertSame(inputOffsets, offsets);
+                assertEquals(RetriableCommitFailedException.class, 
exception.getClass());
+                assertEquals(UnknownTopicOrPartitionException.class, 
exception.getCause().getClass());
+                callbackInvoked.set(true);
+            });
+
+            coordinator.invokeCompletedOffsetCommitCallbacks();
+            assertTrue(callbackInvoked.get());
+        }
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = { true, false })
+    public void testRetryCommitUnknownTopicId(boolean commitSync) {
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+        client.prepareResponse(offsetCommitResponse(singletonMap(ti1p, 
Errors.UNKNOWN_TOPIC_ID)));
+        client.prepareResponse(offsetCommitResponse(singletonMap(ti1p, 
Errors.NONE)));
+
+        Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap(
+            t1p,
+            new OffsetAndMetadata(100L, "metadata")
+        );
+
+        if (commitSync) {
+            assertTrue(coordinator.commitOffsetsSync(offsets, 
time.timer(Long.MAX_VALUE)));
+
+        } else {
+            AtomicBoolean callbackInvoked = new AtomicBoolean();
+            coordinator.commitOffsetsAsync(offsets, (inputOffsets, exception) 
-> {
+                // Unlike the commit offset sync API, the async API does not 
retry.
+                assertSame(inputOffsets, offsets);
+                assertEquals(RetriableCommitFailedException.class, 
exception.getClass());
+                assertEquals(UnknownTopicOrPartitionException.class, 
exception.getCause().getClass());
+                callbackInvoked.set(true);
+            });
+
+            coordinator.invokeCompletedOffsetCommitCallbacks();
+            assertTrue(callbackInvoked.get());
+        }
+    }
+
+    static Stream<Arguments> commitOffsetTestArgs() {
+        Map<TopicIdPartition, Long> byTopicIdOffsets = new HashMap<>();
+        byTopicIdOffsets.put(ti1p, 100L);
+        byTopicIdOffsets.put(ti2p, 200L);
+
+        TopicIdPartition unknownTopicIdPartition =
+            new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("topic3", 5));
+
+        Map<TopicIdPartition, Long> byTopicNameOffsets = new HashMap<>();
+        byTopicNameOffsets.put(ti1p, 100L);
+        byTopicNameOffsets.put(ti2p, 200L);
+        byTopicNameOffsets.put(unknownTopicIdPartition, 300L);
+
+        OffsetCommitRequestData byTopicIdData = new OffsetCommitRequestData()
+            .setGroupId(groupId)
+            .setGenerationId(OffsetCommitRequest.DEFAULT_GENERATION_ID)
+            .setTopics(Arrays.asList(
+                new OffsetCommitRequestTopic()
+                    .setTopicId(ti1p.topicId())
+                    .setName(topic1)
+                    .setPartitions(singletonList(new 
OffsetCommitRequestPartition()
+                        .setPartitionIndex(t1p.partition())
+                        .setCommittedOffset(100L)
+                        .setCommittedMetadata("metadata"))),
+                new OffsetCommitRequestTopic()
+                    .setTopicId(ti2p.topicId())
+                    .setName(topic2)
+                    .setPartitions(singletonList(new 
OffsetCommitRequestPartition()
+                        .setPartitionIndex(t2p.partition())
+                        .setCommittedOffset(200L)
+                        .setCommittedMetadata("metadata")))
+            ));
+
+        OffsetCommitRequestData byTopicNameData = byTopicIdData.duplicate();
+        byTopicNameData.topics().add(new OffsetCommitRequestTopic()
+            .setName(unknownTopicIdPartition.topic())
+            .setPartitions(singletonList(new OffsetCommitRequestPartition()
+                .setPartitionIndex(5)
+                .setCommittedOffset(300L)
+                .setCommittedMetadata("metadata")))
+        );
+
+        return Stream.of(
+            Arguments.of(true, byTopicIdOffsets, byTopicIdData, (short) 9),
+            Arguments.of(false, byTopicIdOffsets, byTopicIdData, (short) 9),
+            Arguments.of(true, byTopicNameOffsets, byTopicNameData, (short) 8),
+            Arguments.of(false, byTopicNameOffsets, byTopicNameData, (short) 8)
+        );
+    }
+
+    private static Map<TopicPartition, OffsetAndMetadata> 
offsetAndMetadata(Map<TopicIdPartition, Long> offsets) {
+        return offsets.entrySet().stream()
+            .map(e -> new SimpleEntry<>(e.getKey().topicPartition(), new 
OffsetAndMetadata(e.getValue(), "metadata")))
+            .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+    }
+
+    @ParameterizedTest
+    @MethodSource("commitOffsetTestArgs")
+    public void testTopicIdsArePopulatedByTheConsumerCoordinator(
+            boolean commitSync,
+            Map<TopicIdPartition, Long> offsets,
+            OffsetCommitRequestData expectedRequestData,
+            short expectedRequestVersion) {
+
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+        OffsetCommitRequestCaptor captor = new OffsetCommitRequestCaptor();
+        prepareOffsetCommitRequest(offsets, Errors.NONE, false, captor);
+
+        Map<TopicPartition, OffsetAndMetadata> input = 
offsetAndMetadata(offsets);
+
+        if (commitSync) {
+            assertTrue(coordinator.commitOffsetsSync(input, 
time.timer(Long.MAX_VALUE)));
+
+        } else {
+            AtomicBoolean callbackInvoked = new AtomicBoolean();
+            coordinator.commitOffsetsAsync(input, (inputOffsets, exception) -> 
{
+                // Notes:
+                // 1) The offsets passed to the callback are the same object 
provided to the offset commit method.
+                //    The validation on the offsets is not required but 
defensive.
+                // 2) We validate that the commit was successful, which is the 
case if the exception is null.
+                // 3) We validate this callback was invoked, which is not 
necessary but defensive.
+                assertSame(inputOffsets, input);
+                assertNull(exception);
+                callbackInvoked.set(true);
+            });
+
+            coordinator.invokeCompletedOffsetCommitCallbacks();
+            assertTrue(callbackInvoked.get());
+        }
+
+        // The consumer does not provide a guarantee on the order of 
occurrence of topics and partitions in the
+        // OffsetCommit request, since a map of offsets is provided to the 
consumer API. Here, both requests
+        // are asserted to be identical irrespective of the order in which 
topic and partitions appear in the requests.
+        assertRequestEquals(
+            new OffsetCommitRequest(expectedRequestData, 
expectedRequestVersion),
+            captor.request
+        );
+    }
+
+    @ParameterizedTest
+    @NullSource
+    @ValueSource(strings = { "", "test1" })
+    public void 
testInvalidTopicIdReturnedByBrokerWhenCommittingOffsetSync(String topicName) {
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+        Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap(t1p, new 
OffsetAndMetadata(100L, "m"));
+
+        // The following offset commit response is valid and the authorization 
failure results in failing the
+        // offset commit invocation.
+        client.prepareResponse(offsetCommitResponse(topicName, ti1p.topicId(), 
Errors.GROUP_AUTHORIZATION_FAILED));
+        assertThrows(GroupAuthorizationException.class,
+            () -> coordinator.commitOffsetsSync(offsets, 
time.timer(Long.MAX_VALUE)));
+
+        // The following offset commit response defines a topic incorrectly. 
The coordinator ignores the topic,
+        // and the group authorization failure is therefore not propagated.
+        client.prepareResponse(offsetCommitResponse(topicName, Uuid.ZERO_UUID, 
Errors.GROUP_AUTHORIZATION_FAILED));
+        assertTrue(coordinator.commitOffsetsSync(offsets, 
time.timer(Long.MAX_VALUE)));

Review Comment:
   This one made me think that we are probably not doing the right thing in the 
implementation. In this particular case, if we have only one committed offset 
and we don't have a response for it because the topic id is wrong, I think that 
`commitOffsetsSync` should not succeed because we actually don't know if the 
offset was committed or not. What do you think?
   
   One way around this would be to verify that we have received a response for 
each topic-partitions.



##########
clients/src/test/java/org/apache/kafka/common/requests/OffsetCommitRequestTest.java:
##########
@@ -54,89 +68,204 @@ public class OffsetCommitRequestTest {
     protected static int throttleTimeMs = 10;
 
     private static OffsetCommitRequestData data;
-    private static List<OffsetCommitRequestTopic> topics;
+    private static OffsetCommitRequestPartition requestPartitionOne;
+    private static OffsetCommitRequestPartition requestPartitionTwo;
 
     @BeforeEach
     public void setUp() {
-        topics = Arrays.asList(
-            new OffsetCommitRequestTopic()
-                .setName(topicOne)
-                .setPartitions(Collections.singletonList(
-                    new OffsetCommitRequestPartition()
-                        .setPartitionIndex(partitionOne)
-                        .setCommittedOffset(offset)
-                        .setCommittedLeaderEpoch(leaderEpoch)
-                        .setCommittedMetadata(metadata)
-                )),
-            new OffsetCommitRequestTopic()
-                .setName(topicTwo)
-                .setPartitions(Collections.singletonList(
-                    new OffsetCommitRequestPartition()
-                        .setPartitionIndex(partitionTwo)
-                        .setCommittedOffset(offset)
-                        .setCommittedLeaderEpoch(leaderEpoch)
-                        .setCommittedMetadata(metadata)
-                ))
-        );
+        requestPartitionOne = new OffsetCommitRequestPartition()
+            .setPartitionIndex(partitionOne)
+            .setCommittedOffset(offset)
+            .setCommittedLeaderEpoch(leaderEpoch)
+            .setCommittedMetadata(metadata);
+
+        requestPartitionTwo = new OffsetCommitRequestPartition()
+            .setPartitionIndex(partitionTwo)
+            .setCommittedOffset(offset)
+            .setCommittedLeaderEpoch(leaderEpoch)
+            .setCommittedMetadata(metadata);
+
         data = new OffsetCommitRequestData()
-                   .setGroupId(groupId)
-                   .setTopics(topics);
+           .setGroupId(groupId)
+           .setTopics(Arrays.asList(
+               new OffsetCommitRequestTopic()
+                   .setTopicId(topicOneId)
+                   .setName(topicOne)
+                   
.setPartitions(Collections.singletonList(requestPartitionOne)),
+               new OffsetCommitRequestTopic()
+                   .setTopicId(topicTwoId)
+                   .setName(topicTwo)
+                   
.setPartitions(Collections.singletonList(requestPartitionTwo))
+           ));
     }
 
-    @Test
-    public void testConstructor() {
-        Map<TopicPartition, Long> expectedOffsets = new HashMap<>();
-        expectedOffsets.put(new TopicPartition(topicOne, partitionOne), 
offset);
-        expectedOffsets.put(new TopicPartition(topicTwo, partitionTwo), 
offset);
+    public static Map<TopicPartition, Long> offsets(
+        OffsetCommitRequest request,
+        TopicIdAndNameBiMap topicIdAndNames
+    ) {
+        Map<TopicPartition, Long> offsets = new HashMap<>();
+        for (OffsetCommitRequestTopic topic : request.data().topics()) {
+            String topicName = topic.name();
 
-        OffsetCommitRequest.Builder builder = new 
OffsetCommitRequest.Builder(data);
+            if (request.version() >= 9) {
+                topicName = 
topicIdAndNames.getTopicName(topic.topicId()).orElseThrow(
+                    () -> new UnknownTopicIdException("Topic with ID " + 
topic.topicId() + " not found."));
+            }
 
-        for (short version : ApiKeys.TXN_OFFSET_COMMIT.allVersions()) {
-            OffsetCommitRequest request = builder.build(version);
-            assertEquals(expectedOffsets, request.offsets());
+            for (OffsetCommitRequestData.OffsetCommitRequestPartition 
partition : topic.partitions()) {
+                offsets.put(new TopicPartition(topicName, 
partition.partitionIndex()), partition.committedOffset());
+            }
+        }
+        return offsets;
+    }
 
-            OffsetCommitResponse response = 
request.getErrorResponse(throttleTimeMs, Errors.NOT_COORDINATOR.exception());
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
+    public void testConstructor(short version) {
+        OffsetCommitRequest request = new OffsetCommitRequest.Builder(data, 
true).build(version);
+        OffsetCommitResponse response = 
request.getErrorResponse(throttleTimeMs, Errors.NOT_COORDINATOR.exception());
 
-            assertEquals(Collections.singletonMap(Errors.NOT_COORDINATOR, 2), 
response.errorCounts());
-            assertEquals(throttleTimeMs, response.throttleTimeMs());
-        }
+        assertEquals(data, request.data());
+        assertEquals(Collections.singletonMap(Errors.NOT_COORDINATOR, 2), 
response.errorCounts());
+        assertEquals(throttleTimeMs, response.throttleTimeMs());
     }
 
     @Test
     public void testGetErrorResponseTopics() {
         List<OffsetCommitResponseTopic> expectedTopics = Arrays.asList(
             new OffsetCommitResponseTopic()
                 .setName(topicOne)
+                .setTopicId(topicOneId)
                 .setPartitions(Collections.singletonList(
                     new OffsetCommitResponsePartition()
                         .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code())
                         .setPartitionIndex(partitionOne))),
             new OffsetCommitResponseTopic()
                 .setName(topicTwo)
+                .setTopicId(topicTwoId)
                 .setPartitions(Collections.singletonList(
                     new OffsetCommitResponsePartition()
                         .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code())
                         .setPartitionIndex(partitionTwo)))
         );
-        assertEquals(expectedTopics, getErrorResponseTopics(topics, 
Errors.UNKNOWN_MEMBER_ID));
+        assertEquals(expectedTopics, getErrorResponseTopics(data.topics(), 
Errors.UNKNOWN_MEMBER_ID));
     }
 
-    @Test
-    public void testVersionSupportForGroupInstanceId() {
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
+    public void testVersionSupportForGroupInstanceId(short version) {
         OffsetCommitRequest.Builder builder = new OffsetCommitRequest.Builder(
             new OffsetCommitRequestData()
                 .setGroupId(groupId)
                 .setMemberId(memberId)
-                .setGroupInstanceId(groupInstanceId)
+                .setGroupInstanceId(groupInstanceId),
+            true
         );
 
-        for (short version : ApiKeys.OFFSET_COMMIT.allVersions()) {
-            if (version >= 7) {
-                builder.build(version);
-            } else {
-                final short finalVersion = version;
-                assertThrows(UnsupportedVersionException.class, () -> 
builder.build(finalVersion));
-            }
+        if (version >= 7) {
+            builder.build(version);
+        } else {
+            final short finalVersion = version;
+            assertThrows(UnsupportedVersionException.class, () -> 
builder.build(finalVersion));
+        }
+    }
+
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
+    public void testResolvesTopicNameIfRequiredWhenListingOffsets(short 
version) {
+        OffsetCommitRequest request = new OffsetCommitRequest.Builder(data, 
true).build(version);
+        List<OffsetCommitRequestTopic> topics = request.data().topics();
+
+        assertEquals(2, topics.stream().flatMap(t -> 
t.partitions().stream()).count());
+        assertEquals(requestPartitionOne, topics.get(0).partitions().get(0));
+        assertEquals(requestPartitionTwo, topics.get(1).partitions().get(0));
+    }
+
+    @Test
+    public void testUnresolvableTopicIdWhenListingOffset() {
+        OffsetCommitRequest request = new 
OffsetCommitRequest.Builder(data.duplicate(), true).build((short) 9);
+        assertThrows(UnknownTopicIdException.class,
+            () -> OffsetCommitRequestTest.offsets(request, 
TopicIdAndNameBiMap.emptyMapping()));
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void maxAllowedVersionIsEightIfRequestCannotUseTopicIds(boolean 
canUseTopicIds) {
+        OffsetCommitRequest.Builder builder = new 
OffsetCommitRequest.Builder(data.duplicate(), canUseTopicIds);
+        assertEquals(canUseTopicIds ? 9 : 8, 
builder.build(builder.latestAllowedVersion()).version());
+    }
+
+    /**
+     * Compares the two {@link OffsetCommitRequest} independently of the order 
in which the
+     * {@link OffsetCommitRequestTopic} and {@link 
OffsetCommitRequestPartition} are defined in the response.
+     */
+    public static void assertRequestEquals(OffsetCommitRequest 
expectedRequest, OffsetCommitRequest actualRequest) {
+        if (expectedRequest.version() > 9 || actualRequest.version() > 9) {
+            throw new AssertionError("A new version of OffsetCommitRequest has 
been detected. Please " +
+                    "review the equality contract enforced here and add/remove 
fields accordingly.");
         }
+
+        OffsetCommitRequestData expected = expectedRequest.data();
+        OffsetCommitRequestData actual = actualRequest.data();
+
+        assertEquals(expectedRequest.version(), actualRequest.version());
+        assertEquals(expected.groupId(), actual.groupId(), "Group id 
mismatch");
+        assertEquals(expected.groupInstanceId(), actual.groupInstanceId(), 
"Group instance id mismatch");
+        assertEquals(expected.generationId(), actual.generationId(), 
"Generation id mismatch");
+        assertEquals(expected.memberId(), actual.memberId(), "Member id 
mismatch");
+        assertEquals(expected.retentionTimeMs(), actual.retentionTimeMs(), 
"Retention time mismatch");
+
+        Function<OffsetCommitRequestTopic, List<OffsetCommitRequestPartition>> 
partitionSelector =
+            OffsetCommitRequestTopic::partitions;
+
+        Function<OffsetCommitRequestTopic, NameAndId> topicClassifier =
+            topic -> new NameAndId(topic.name(), topic.topicId());
+
+        BiFunction<NameAndId, OffsetCommitRequestPartition, TopicIdPartition> 
partitionClassifier =
+            (nameAndId, p) -> new TopicIdPartition(nameAndId.id(), 
p.partitionIndex(), nameAndId.name());
+
+        Function<OffsetCommitRequestData, Map<TopicIdPartition, 
OffsetCommitRequestPartition>> partitioner =
+            request -> partition(request.topics(), partitionSelector, 
topicClassifier, partitionClassifier);
+
+        assertEquals(partitioner.apply(expected), partitioner.apply(actual));
+    }
+
+    /**
+     * Compares the two {@link OffsetCommitResponse} independently of the 
order in which the
+     * {@link OffsetCommitResponseTopic} and {@link 
OffsetCommitResponsePartition} are defined in the response.
+     */
+    public static void assertResponseEquals(OffsetCommitResponse expected, 
OffsetCommitResponse actual) {
+        assertEquals(expected.throttleTimeMs(), actual.throttleTimeMs());
+        assertEquals(expected.errorCounts(), actual.errorCounts());
+
+        Function<OffsetCommitResponseTopic, 
List<OffsetCommitResponsePartition>> partitionSelector =
+            OffsetCommitResponseTopic::partitions;
+
+        Function<OffsetCommitResponseTopic, NameAndId> topicClassifier =
+            topic -> new NameAndId(topic.name(), topic.topicId());
+
+        BiFunction<NameAndId, OffsetCommitResponsePartition, TopicIdPartition> 
partitionClassifier =
+            (nameAndId, p) -> new TopicIdPartition(nameAndId.id(), 
p.partitionIndex(), nameAndId.name());
+
+        Function<OffsetCommitResponse, Map<TopicIdPartition, 
OffsetCommitResponsePartition>> partitioner =
+            response -> partition(response.data().topics(), partitionSelector, 
topicClassifier, partitionClassifier);
+
+        assertEquals(partitioner.apply(expected), partitioner.apply(actual));
+    }
+
+    private static <T, P> Map<TopicIdPartition, P> partition(

Review Comment:
   This seems to be a quite complicated way to group 
`OffsetCommitRequestPartition` or `OffsetCommitResponsePartition` by 
`TopicIdPartition`, no? I would just write two methods to do just this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to