dajac commented on code in PR #13240: URL: https://github.com/apache/kafka/pull/13240#discussion_r1168675481
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java: ########## @@ -1283,10 +1289,16 @@ RequestFuture<Void> sendOffsetCommitRequest(final Map<TopicPartition, OffsetAndM return RequestFuture.failure(new IllegalArgumentException("Invalid offset: " + offsetAndMetadata.offset())); } + Uuid topicId = topicIdAndNames.getTopicIdOrZero(topicPartition.topic()); Review Comment: nit: `topicIdOrZero`? ########## clients/src/main/java/org/apache/kafka/common/TopicIdAndNameBiMap.java: ########## @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common; + +import org.apache.kafka.common.errors.InvalidTopicException; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +/** + * Encapsulates the mapping between topic names and ids assuming a 1:1 relationship between + * a name and an id. + * <p></p> + * Note that this class intends to be used for the (reverse) lookup of topic IDs/names, but + * not to characterize the set of topics which are known by a client. Use the + * {@link org.apache.kafka.clients.MetadataCache} for that purpose. + */ +public class TopicIdAndNameBiMap { + private final Map<String, Uuid> topicIds; + private final Map<Uuid, String> topicNames; + + /** + * A mapping which universe of topic ids and names is captured from the input map. The reverse association + * between a topic ID and a topic name is computed by this method. If there are more than one topic name + * resolving to the same topic ID, an {@link InvalidTopicException} is thrown. + */ + public static TopicIdAndNameBiMap fromTopicIds(Map<String, Uuid> topicIds) { + Map<Uuid, String> topicNames = new HashMap<>(topicIds.size()); + + for (Map.Entry<String, Uuid> e: topicIds.entrySet()) { + String conflicting = topicNames.putIfAbsent(e.getValue(), e.getKey()); + if (conflicting != null) { + throw new IllegalStateException( + "Topic " + e.getKey() + " shares the same ID " + e.getValue() + " as topic " + conflicting); + } + } + + return new TopicIdAndNameBiMap(topicIds, topicNames); + } + + /** + * A mapping which acts as a wrapper around the input mapping of topic ids from/to topic names. + * No validation is performed about the consistency of the mapping. This method is to be preferred + * when the copy of the input maps needs to be avoided. + */ + public static TopicIdAndNameBiMap wrap(Map<String, Uuid> topicIds, Map<Uuid, String> topicNames) { + return new TopicIdAndNameBiMap(topicIds, topicNames); + } + + /** + * Used when no mapping between topic name and id exists. + */ + public static TopicIdAndNameBiMap emptyMapping() { + return fromTopicIds(Collections.emptyMap()); + } + + private TopicIdAndNameBiMap(Map<String, Uuid> topicIds, Map<Uuid, String> topicNames) { + this.topicIds = Collections.unmodifiableMap(topicIds); + this.topicNames = Collections.unmodifiableMap(topicNames); + } + + /** + * Returns the ID of the topic with the given name, if that association exists. + */ + public Uuid getTopicIdOrZero(String name) { + return Optional.ofNullable(topicIds.get(name)).orElse(Uuid.ZERO_UUID); Review Comment: nit: Should we replace `ofNullable` by a simple `if/else` statement? Allocating an optional does not seem necessary here. ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java: ########## @@ -2707,11 +2742,271 @@ public void testCommitOffsetUnknownMemberId() { client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); - prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.UNKNOWN_MEMBER_ID); + prepareOffsetCommitRequest(singletonMap(ti1p, 100L), Errors.UNKNOWN_MEMBER_ID); assertThrows(CommitFailedException.class, () -> coordinator.commitOffsetsSync(singletonMap(t1p, new OffsetAndMetadata(100L, "metadata")), time.timer(Long.MAX_VALUE))); } + @ParameterizedTest + @ValueSource(booleans = { true, false }) + public void testCommitOffsetUnknownTopicId(boolean commitSync) { + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); + coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); + + // Prepare five OffsetCommit requests which return a retriable UNKNOWN_TOPIC_ID error. + // Set the timeout accordingly so that commitOffsetsSync completes on the fifth attempt. + // Note that the timer (MockTime) only ticks when its sleep(long) method is invoked. + // Because the ConsumerNetworkClient does not call sleep() in its network-level poll(.), this + // timer never moves forward once the network client is invoked. If there is no available + // response to consume, its internal poll loop never completes. Hence, the timeout needs to be + // enforced in the ConsumerCoordinator, and we need to make sure there are enough responses + // queued in the MockClient to satisfy all invocations of the ConsumerNetworkClient#poll(.). + int offsetCommitCalls = 5; + long timeoutMs = rebalanceConfig.retryBackoffMs * offsetCommitCalls; + + IntStream.range(0, offsetCommitCalls).forEach(__ -> + prepareOffsetCommitRequest(singletonMap(ti1p, 100L), Errors.UNKNOWN_TOPIC_ID)); + + // UnknownTopicIdException is retriable, hence will be retried by the coordinator as long as + // the timeout allows. Note that since topic ids are not part of the public API of the consumer, + // we cannot throw an UnknownTopicId to the user. By design, a false boolean indicating the + // offset commit failed is returned. + + Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap( + t1p, + new OffsetAndMetadata(100L, "metadata") + ); + + if (commitSync) { + assertFalse(coordinator.commitOffsetsSync(offsets, time.timer(timeoutMs))); + + } else { + AtomicBoolean callbackInvoked = new AtomicBoolean(); + coordinator.commitOffsetsAsync(offsets, (inputOffsets, exception) -> { + assertSame(inputOffsets, offsets); + assertEquals(RetriableCommitFailedException.class, exception.getClass()); + assertEquals(UnknownTopicOrPartitionException.class, exception.getCause().getClass()); + callbackInvoked.set(true); + }); + + coordinator.invokeCompletedOffsetCommitCallbacks(); + assertTrue(callbackInvoked.get()); + } + } + + @ParameterizedTest + @ValueSource(booleans = { true, false }) + public void testRetryCommitUnknownTopicId(boolean commitSync) { + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); + coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); + + client.prepareResponse(offsetCommitResponse(singletonMap(ti1p, Errors.UNKNOWN_TOPIC_ID))); + client.prepareResponse(offsetCommitResponse(singletonMap(ti1p, Errors.NONE))); + + Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap( + t1p, + new OffsetAndMetadata(100L, "metadata") + ); + + if (commitSync) { + assertTrue(coordinator.commitOffsetsSync(offsets, time.timer(Long.MAX_VALUE))); + Review Comment: nit: This empty line could be removed. ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java: ########## @@ -3981,23 +4348,13 @@ private void prepareJoinAndSyncResponse(String consumerId, int generation, List< }, syncGroupResponse(assignment, Errors.NONE)); } - private Map<TopicPartition, Errors> partitionErrors(Collection<TopicPartition> partitions, Errors error) { - final Map<TopicPartition, Errors> errors = new HashMap<>(); - for (TopicPartition partition : partitions) { - errors.put(partition, error); - } - return errors; - } - - private void respondToOffsetCommitRequest(final Map<TopicPartition, Long> expectedOffsets, Errors error) { - Map<TopicPartition, Errors> errors = partitionErrors(expectedOffsets.keySet(), error); - client.respond(offsetCommitRequestMatcher(expectedOffsets), offsetCommitResponse(errors)); - } - - private MockClient.RequestMatcher offsetCommitRequestMatcher(final Map<TopicPartition, Long> expectedOffsets) { + private MockClient.RequestMatcher offsetCommitRequestMatcher( + Map<TopicPartition, Long> expectedOffsets, + Consumer<OffsetCommitRequest> requestConsumer Review Comment: I am not sure to follow why we need this `Consumer` here. Couldn't we just have a matcher which verifies what we want/need? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java: ########## @@ -1331,21 +1343,28 @@ RequestFuture<Void> sendOffsetCommitRequest(final Map<TopicPartition, OffsetAndM .setGenerationId(generation.generationId) .setMemberId(generation.memberId) .setGroupInstanceId(groupInstanceId) - .setTopics(new ArrayList<>(requestTopicDataMap.values())) + .setTopics(new ArrayList<>(requestTopicDataMap.values())), + canUseTopicIds ); log.trace("Sending OffsetCommit request with {} to coordinator {}", offsets, coordinator); return client.send(coordinator, builder) - .compose(new OffsetCommitResponseHandler(offsets, generation)); + .compose(new OffsetCommitResponseHandler(offsets, generation, topicIdAndNames)); } private class OffsetCommitResponseHandler extends CoordinatorResponseHandler<OffsetCommitResponse, Void> { private final Map<TopicPartition, OffsetAndMetadata> offsets; + private final TopicIdAndNameBiMap topicIdAndNames; - private OffsetCommitResponseHandler(Map<TopicPartition, OffsetAndMetadata> offsets, Generation generation) { + private OffsetCommitResponseHandler( + Map<TopicPartition, OffsetAndMetadata> offsets, + Generation generation, + TopicIdAndNameBiMap topicIdAndNames Review Comment: nit: Indentation seems to be off here. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java: ########## @@ -1354,8 +1373,21 @@ public void handle(OffsetCommitResponse commitResponse, RequestFuture<Void> futu Set<String> unauthorizedTopics = new HashSet<>(); for (OffsetCommitResponseData.OffsetCommitResponseTopic topic : commitResponse.data().topics()) { + String topicName = topic.name(); + + if (this.response.requestHeader().apiVersion() >= 9) { + topicName = topicIdAndNames.getTopicName(topic.topicId()).orElse(null); Review Comment: nit: `topicNameOrNull` and get rid of the `Optional`? ########## clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java: ########## @@ -46,6 +49,8 @@ * - {@link Errors#INVALID_COMMIT_OFFSET_SIZE} * - {@link Errors#TOPIC_AUTHORIZATION_FAILED} * - {@link Errors#GROUP_AUTHORIZATION_FAILED} + * - {@link Errors#UNKNOWN_MEMBER_ID} Review Comment: nit: This one is already in the list (L47). ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java: ########## @@ -129,16 +150,19 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertSame; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; public abstract class ConsumerCoordinatorTest { - private final String topic1 = "test1"; - private final String topic2 = "test2"; - private final TopicPartition t1p = new TopicPartition(topic1, 0); - private final TopicPartition t2p = new TopicPartition(topic2, 0); - private final String groupId = "test-group"; + private static String topic1 = "test1"; + private static String topic2 = "test2"; + private static TopicPartition t1p = new TopicPartition(topic1, 0); + private static TopicIdPartition ti1p = new TopicIdPartition(Uuid.randomUuid(), t1p); Review Comment: nit: Could we try to combine those? `private static TopicIdPartition t1p = new new TopicIdPartition(Uuid.randomUuid(), 0, topic1)`? ########## clients/src/main/java/org/apache/kafka/common/TopicIdAndNameBiMap.java: ########## @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common; + +import org.apache.kafka.common.errors.InvalidTopicException; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +/** + * Encapsulates the mapping between topic names and ids assuming a 1:1 relationship between + * a name and an id. + * <p></p> Review Comment: nit: Should we remove `<p></p>`? ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java: ########## @@ -159,13 +183,22 @@ public abstract class ConsumerCoordinatorTest { private final String consumerId = "consumer"; private final String consumerId2 = "consumer2"; + private final Map<String, Uuid> topicIds = new HashMap<>(); + { + topicIds.put(topic1, ti1p.topicId()); + topicIds.put(topic2, ti2p.topicId()); + } + + private final Map<String, Integer> partitionCounts = new HashMap<>(); + { + partitionCounts.put(topic1, 1); + partitionCounts.put(topic2, 1); + } + + private final TopicIdAndNameBiMap topicIdAndNames = TopicIdAndNameBiMap.fromTopicIds(topicIds); Review Comment: nit: `topicIdAndNameBiMapping`? ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java: ########## @@ -2707,11 +2742,271 @@ public void testCommitOffsetUnknownMemberId() { client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); - prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.UNKNOWN_MEMBER_ID); + prepareOffsetCommitRequest(singletonMap(ti1p, 100L), Errors.UNKNOWN_MEMBER_ID); assertThrows(CommitFailedException.class, () -> coordinator.commitOffsetsSync(singletonMap(t1p, new OffsetAndMetadata(100L, "metadata")), time.timer(Long.MAX_VALUE))); } + @ParameterizedTest + @ValueSource(booleans = { true, false }) + public void testCommitOffsetUnknownTopicId(boolean commitSync) { + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); + coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); + + // Prepare five OffsetCommit requests which return a retriable UNKNOWN_TOPIC_ID error. + // Set the timeout accordingly so that commitOffsetsSync completes on the fifth attempt. + // Note that the timer (MockTime) only ticks when its sleep(long) method is invoked. + // Because the ConsumerNetworkClient does not call sleep() in its network-level poll(.), this + // timer never moves forward once the network client is invoked. If there is no available + // response to consume, its internal poll loop never completes. Hence, the timeout needs to be + // enforced in the ConsumerCoordinator, and we need to make sure there are enough responses + // queued in the MockClient to satisfy all invocations of the ConsumerNetworkClient#poll(.). + int offsetCommitCalls = 5; + long timeoutMs = rebalanceConfig.retryBackoffMs * offsetCommitCalls; + + IntStream.range(0, offsetCommitCalls).forEach(__ -> + prepareOffsetCommitRequest(singletonMap(ti1p, 100L), Errors.UNKNOWN_TOPIC_ID)); + + // UnknownTopicIdException is retriable, hence will be retried by the coordinator as long as + // the timeout allows. Note that since topic ids are not part of the public API of the consumer, + // we cannot throw an UnknownTopicId to the user. By design, a false boolean indicating the + // offset commit failed is returned. + + Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap( + t1p, + new OffsetAndMetadata(100L, "metadata") + ); + + if (commitSync) { + assertFalse(coordinator.commitOffsetsSync(offsets, time.timer(timeoutMs))); + Review Comment: nit: This empty line could be removed. ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java: ########## @@ -2707,11 +2742,271 @@ public void testCommitOffsetUnknownMemberId() { client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); - prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.UNKNOWN_MEMBER_ID); + prepareOffsetCommitRequest(singletonMap(ti1p, 100L), Errors.UNKNOWN_MEMBER_ID); assertThrows(CommitFailedException.class, () -> coordinator.commitOffsetsSync(singletonMap(t1p, new OffsetAndMetadata(100L, "metadata")), time.timer(Long.MAX_VALUE))); } + @ParameterizedTest + @ValueSource(booleans = { true, false }) + public void testCommitOffsetUnknownTopicId(boolean commitSync) { + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); + coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); + + // Prepare five OffsetCommit requests which return a retriable UNKNOWN_TOPIC_ID error. + // Set the timeout accordingly so that commitOffsetsSync completes on the fifth attempt. + // Note that the timer (MockTime) only ticks when its sleep(long) method is invoked. + // Because the ConsumerNetworkClient does not call sleep() in its network-level poll(.), this + // timer never moves forward once the network client is invoked. If there is no available + // response to consume, its internal poll loop never completes. Hence, the timeout needs to be + // enforced in the ConsumerCoordinator, and we need to make sure there are enough responses + // queued in the MockClient to satisfy all invocations of the ConsumerNetworkClient#poll(.). + int offsetCommitCalls = 5; + long timeoutMs = rebalanceConfig.retryBackoffMs * offsetCommitCalls; + + IntStream.range(0, offsetCommitCalls).forEach(__ -> + prepareOffsetCommitRequest(singletonMap(ti1p, 100L), Errors.UNKNOWN_TOPIC_ID)); + + // UnknownTopicIdException is retriable, hence will be retried by the coordinator as long as + // the timeout allows. Note that since topic ids are not part of the public API of the consumer, + // we cannot throw an UnknownTopicId to the user. By design, a false boolean indicating the + // offset commit failed is returned. + + Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap( + t1p, + new OffsetAndMetadata(100L, "metadata") + ); + + if (commitSync) { + assertFalse(coordinator.commitOffsetsSync(offsets, time.timer(timeoutMs))); + + } else { + AtomicBoolean callbackInvoked = new AtomicBoolean(); + coordinator.commitOffsetsAsync(offsets, (inputOffsets, exception) -> { + assertSame(inputOffsets, offsets); + assertEquals(RetriableCommitFailedException.class, exception.getClass()); + assertEquals(UnknownTopicOrPartitionException.class, exception.getCause().getClass()); + callbackInvoked.set(true); + }); + + coordinator.invokeCompletedOffsetCommitCallbacks(); + assertTrue(callbackInvoked.get()); + } + } + + @ParameterizedTest + @ValueSource(booleans = { true, false }) + public void testRetryCommitUnknownTopicId(boolean commitSync) { + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); + coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); + + client.prepareResponse(offsetCommitResponse(singletonMap(ti1p, Errors.UNKNOWN_TOPIC_ID))); + client.prepareResponse(offsetCommitResponse(singletonMap(ti1p, Errors.NONE))); + + Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap( + t1p, + new OffsetAndMetadata(100L, "metadata") + ); + + if (commitSync) { + assertTrue(coordinator.commitOffsetsSync(offsets, time.timer(Long.MAX_VALUE))); + + } else { + AtomicBoolean callbackInvoked = new AtomicBoolean(); + coordinator.commitOffsetsAsync(offsets, (inputOffsets, exception) -> { + // Unlike the commit offset sync API, the async API does not retry. Review Comment: If the outcome of the test is different in this case, isn't it a bit weird to combine them in the same unit test? ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java: ########## @@ -2707,11 +2742,271 @@ public void testCommitOffsetUnknownMemberId() { client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); - prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.UNKNOWN_MEMBER_ID); + prepareOffsetCommitRequest(singletonMap(ti1p, 100L), Errors.UNKNOWN_MEMBER_ID); assertThrows(CommitFailedException.class, () -> coordinator.commitOffsetsSync(singletonMap(t1p, new OffsetAndMetadata(100L, "metadata")), time.timer(Long.MAX_VALUE))); } + @ParameterizedTest + @ValueSource(booleans = { true, false }) + public void testCommitOffsetUnknownTopicId(boolean commitSync) { + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); + coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); + + // Prepare five OffsetCommit requests which return a retriable UNKNOWN_TOPIC_ID error. + // Set the timeout accordingly so that commitOffsetsSync completes on the fifth attempt. + // Note that the timer (MockTime) only ticks when its sleep(long) method is invoked. + // Because the ConsumerNetworkClient does not call sleep() in its network-level poll(.), this + // timer never moves forward once the network client is invoked. If there is no available + // response to consume, its internal poll loop never completes. Hence, the timeout needs to be + // enforced in the ConsumerCoordinator, and we need to make sure there are enough responses + // queued in the MockClient to satisfy all invocations of the ConsumerNetworkClient#poll(.). + int offsetCommitCalls = 5; + long timeoutMs = rebalanceConfig.retryBackoffMs * offsetCommitCalls; + + IntStream.range(0, offsetCommitCalls).forEach(__ -> + prepareOffsetCommitRequest(singletonMap(ti1p, 100L), Errors.UNKNOWN_TOPIC_ID)); + + // UnknownTopicIdException is retriable, hence will be retried by the coordinator as long as + // the timeout allows. Note that since topic ids are not part of the public API of the consumer, + // we cannot throw an UnknownTopicId to the user. By design, a false boolean indicating the + // offset commit failed is returned. + + Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap( + t1p, + new OffsetAndMetadata(100L, "metadata") + ); + + if (commitSync) { + assertFalse(coordinator.commitOffsetsSync(offsets, time.timer(timeoutMs))); + + } else { + AtomicBoolean callbackInvoked = new AtomicBoolean(); + coordinator.commitOffsetsAsync(offsets, (inputOffsets, exception) -> { + assertSame(inputOffsets, offsets); + assertEquals(RetriableCommitFailedException.class, exception.getClass()); + assertEquals(UnknownTopicOrPartitionException.class, exception.getCause().getClass()); + callbackInvoked.set(true); + }); + + coordinator.invokeCompletedOffsetCommitCallbacks(); + assertTrue(callbackInvoked.get()); + } + } + + @ParameterizedTest + @ValueSource(booleans = { true, false }) + public void testRetryCommitUnknownTopicId(boolean commitSync) { + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); + coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); + + client.prepareResponse(offsetCommitResponse(singletonMap(ti1p, Errors.UNKNOWN_TOPIC_ID))); + client.prepareResponse(offsetCommitResponse(singletonMap(ti1p, Errors.NONE))); + + Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap( + t1p, + new OffsetAndMetadata(100L, "metadata") + ); + + if (commitSync) { + assertTrue(coordinator.commitOffsetsSync(offsets, time.timer(Long.MAX_VALUE))); + + } else { + AtomicBoolean callbackInvoked = new AtomicBoolean(); + coordinator.commitOffsetsAsync(offsets, (inputOffsets, exception) -> { + // Unlike the commit offset sync API, the async API does not retry. + assertSame(inputOffsets, offsets); + assertEquals(RetriableCommitFailedException.class, exception.getClass()); + assertEquals(UnknownTopicOrPartitionException.class, exception.getCause().getClass()); + callbackInvoked.set(true); + }); + + coordinator.invokeCompletedOffsetCommitCallbacks(); + assertTrue(callbackInvoked.get()); + } + } + + static Stream<Arguments> commitOffsetTestArgs() { Review Comment: This goes a bit too far in my opinion. We usually prefer to have simpler parameterized tests. Could we simplify this somehow and bring stuck back in the main unit test? ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java: ########## @@ -2707,11 +2742,271 @@ public void testCommitOffsetUnknownMemberId() { client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); - prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.UNKNOWN_MEMBER_ID); + prepareOffsetCommitRequest(singletonMap(ti1p, 100L), Errors.UNKNOWN_MEMBER_ID); assertThrows(CommitFailedException.class, () -> coordinator.commitOffsetsSync(singletonMap(t1p, new OffsetAndMetadata(100L, "metadata")), time.timer(Long.MAX_VALUE))); } + @ParameterizedTest + @ValueSource(booleans = { true, false }) + public void testCommitOffsetUnknownTopicId(boolean commitSync) { + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); + coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); + + // Prepare five OffsetCommit requests which return a retriable UNKNOWN_TOPIC_ID error. + // Set the timeout accordingly so that commitOffsetsSync completes on the fifth attempt. + // Note that the timer (MockTime) only ticks when its sleep(long) method is invoked. + // Because the ConsumerNetworkClient does not call sleep() in its network-level poll(.), this + // timer never moves forward once the network client is invoked. If there is no available + // response to consume, its internal poll loop never completes. Hence, the timeout needs to be + // enforced in the ConsumerCoordinator, and we need to make sure there are enough responses + // queued in the MockClient to satisfy all invocations of the ConsumerNetworkClient#poll(.). + int offsetCommitCalls = 5; + long timeoutMs = rebalanceConfig.retryBackoffMs * offsetCommitCalls; + + IntStream.range(0, offsetCommitCalls).forEach(__ -> + prepareOffsetCommitRequest(singletonMap(ti1p, 100L), Errors.UNKNOWN_TOPIC_ID)); + + // UnknownTopicIdException is retriable, hence will be retried by the coordinator as long as + // the timeout allows. Note that since topic ids are not part of the public API of the consumer, + // we cannot throw an UnknownTopicId to the user. By design, a false boolean indicating the + // offset commit failed is returned. + + Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap( + t1p, + new OffsetAndMetadata(100L, "metadata") + ); + + if (commitSync) { + assertFalse(coordinator.commitOffsetsSync(offsets, time.timer(timeoutMs))); + + } else { + AtomicBoolean callbackInvoked = new AtomicBoolean(); + coordinator.commitOffsetsAsync(offsets, (inputOffsets, exception) -> { + assertSame(inputOffsets, offsets); + assertEquals(RetriableCommitFailedException.class, exception.getClass()); + assertEquals(UnknownTopicOrPartitionException.class, exception.getCause().getClass()); + callbackInvoked.set(true); + }); + + coordinator.invokeCompletedOffsetCommitCallbacks(); + assertTrue(callbackInvoked.get()); + } + } + + @ParameterizedTest + @ValueSource(booleans = { true, false }) + public void testRetryCommitUnknownTopicId(boolean commitSync) { + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); + coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); + + client.prepareResponse(offsetCommitResponse(singletonMap(ti1p, Errors.UNKNOWN_TOPIC_ID))); + client.prepareResponse(offsetCommitResponse(singletonMap(ti1p, Errors.NONE))); + + Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap( + t1p, + new OffsetAndMetadata(100L, "metadata") + ); + + if (commitSync) { + assertTrue(coordinator.commitOffsetsSync(offsets, time.timer(Long.MAX_VALUE))); + + } else { + AtomicBoolean callbackInvoked = new AtomicBoolean(); + coordinator.commitOffsetsAsync(offsets, (inputOffsets, exception) -> { + // Unlike the commit offset sync API, the async API does not retry. + assertSame(inputOffsets, offsets); + assertEquals(RetriableCommitFailedException.class, exception.getClass()); + assertEquals(UnknownTopicOrPartitionException.class, exception.getCause().getClass()); + callbackInvoked.set(true); + }); + + coordinator.invokeCompletedOffsetCommitCallbacks(); + assertTrue(callbackInvoked.get()); + } + } + + static Stream<Arguments> commitOffsetTestArgs() { + Map<TopicIdPartition, Long> byTopicIdOffsets = new HashMap<>(); + byTopicIdOffsets.put(ti1p, 100L); + byTopicIdOffsets.put(ti2p, 200L); + + TopicIdPartition unknownTopicIdPartition = + new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("topic3", 5)); + + Map<TopicIdPartition, Long> byTopicNameOffsets = new HashMap<>(); + byTopicNameOffsets.put(ti1p, 100L); + byTopicNameOffsets.put(ti2p, 200L); + byTopicNameOffsets.put(unknownTopicIdPartition, 300L); + + OffsetCommitRequestData byTopicIdData = new OffsetCommitRequestData() + .setGroupId(groupId) + .setGenerationId(OffsetCommitRequest.DEFAULT_GENERATION_ID) + .setTopics(Arrays.asList( + new OffsetCommitRequestTopic() + .setTopicId(ti1p.topicId()) + .setName(topic1) + .setPartitions(singletonList(new OffsetCommitRequestPartition() + .setPartitionIndex(t1p.partition()) + .setCommittedOffset(100L) + .setCommittedMetadata("metadata"))), + new OffsetCommitRequestTopic() + .setTopicId(ti2p.topicId()) + .setName(topic2) + .setPartitions(singletonList(new OffsetCommitRequestPartition() + .setPartitionIndex(t2p.partition()) + .setCommittedOffset(200L) + .setCommittedMetadata("metadata"))) + )); + + OffsetCommitRequestData byTopicNameData = byTopicIdData.duplicate(); + byTopicNameData.topics().add(new OffsetCommitRequestTopic() + .setName(unknownTopicIdPartition.topic()) + .setPartitions(singletonList(new OffsetCommitRequestPartition() + .setPartitionIndex(5) + .setCommittedOffset(300L) + .setCommittedMetadata("metadata"))) + ); + + return Stream.of( + Arguments.of(true, byTopicIdOffsets, byTopicIdData, (short) 9), + Arguments.of(false, byTopicIdOffsets, byTopicIdData, (short) 9), + Arguments.of(true, byTopicNameOffsets, byTopicNameData, (short) 8), + Arguments.of(false, byTopicNameOffsets, byTopicNameData, (short) 8) + ); + } + + private static Map<TopicPartition, OffsetAndMetadata> offsetAndMetadata(Map<TopicIdPartition, Long> offsets) { + return offsets.entrySet().stream() + .map(e -> new SimpleEntry<>(e.getKey().topicPartition(), new OffsetAndMetadata(e.getValue(), "metadata"))) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } + + @ParameterizedTest + @MethodSource("commitOffsetTestArgs") + public void testTopicIdsArePopulatedByTheConsumerCoordinator( + boolean commitSync, + Map<TopicIdPartition, Long> offsets, + OffsetCommitRequestData expectedRequestData, + short expectedRequestVersion) { + + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); + coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); + + OffsetCommitRequestCaptor captor = new OffsetCommitRequestCaptor(); + prepareOffsetCommitRequest(offsets, Errors.NONE, false, captor); + + Map<TopicPartition, OffsetAndMetadata> input = offsetAndMetadata(offsets); + + if (commitSync) { + assertTrue(coordinator.commitOffsetsSync(input, time.timer(Long.MAX_VALUE))); + + } else { + AtomicBoolean callbackInvoked = new AtomicBoolean(); + coordinator.commitOffsetsAsync(input, (inputOffsets, exception) -> { + // Notes: + // 1) The offsets passed to the callback are the same object provided to the offset commit method. + // The validation on the offsets is not required but defensive. + // 2) We validate that the commit was successful, which is the case if the exception is null. + // 3) We validate this callback was invoked, which is not necessary but defensive. + assertSame(inputOffsets, input); + assertNull(exception); + callbackInvoked.set(true); + }); + + coordinator.invokeCompletedOffsetCommitCallbacks(); + assertTrue(callbackInvoked.get()); + } + + // The consumer does not provide a guarantee on the order of occurrence of topics and partitions in the + // OffsetCommit request, since a map of offsets is provided to the consumer API. Here, both requests + // are asserted to be identical irrespective of the order in which topic and partitions appear in the requests. + assertRequestEquals( + new OffsetCommitRequest(expectedRequestData, expectedRequestVersion), + captor.request + ); + } + + @ParameterizedTest + @NullSource + @ValueSource(strings = { "", "test1" }) + public void testInvalidTopicIdReturnedByBrokerWhenCommittingOffsetSync(String topicName) { + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); + coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); + + Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap(t1p, new OffsetAndMetadata(100L, "m")); + + // The following offset commit response is valid and the authorization failure results in failing the + // offset commit invocation. + client.prepareResponse(offsetCommitResponse(topicName, ti1p.topicId(), Errors.GROUP_AUTHORIZATION_FAILED)); + assertThrows(GroupAuthorizationException.class, + () -> coordinator.commitOffsetsSync(offsets, time.timer(Long.MAX_VALUE))); + + // The following offset commit response defines a topic incorrectly. The coordinator ignores the topic, + // and the group authorization failure is therefore not propagated. + client.prepareResponse(offsetCommitResponse(topicName, Uuid.ZERO_UUID, Errors.GROUP_AUTHORIZATION_FAILED)); + assertTrue(coordinator.commitOffsetsSync(offsets, time.timer(Long.MAX_VALUE))); Review Comment: This one made me think that we are probably not doing the right thing in the implementation. In this particular case, if we have only one committed offset and we don't have a response for it because the topic id is wrong, I think that `commitOffsetsSync` should not succeed because we actually don't know if the offset was committed or not. What do you think? One way around this would be to verify that we have received a response for each topic-partitions. ########## clients/src/test/java/org/apache/kafka/common/requests/OffsetCommitRequestTest.java: ########## @@ -54,89 +68,204 @@ public class OffsetCommitRequestTest { protected static int throttleTimeMs = 10; private static OffsetCommitRequestData data; - private static List<OffsetCommitRequestTopic> topics; + private static OffsetCommitRequestPartition requestPartitionOne; + private static OffsetCommitRequestPartition requestPartitionTwo; @BeforeEach public void setUp() { - topics = Arrays.asList( - new OffsetCommitRequestTopic() - .setName(topicOne) - .setPartitions(Collections.singletonList( - new OffsetCommitRequestPartition() - .setPartitionIndex(partitionOne) - .setCommittedOffset(offset) - .setCommittedLeaderEpoch(leaderEpoch) - .setCommittedMetadata(metadata) - )), - new OffsetCommitRequestTopic() - .setName(topicTwo) - .setPartitions(Collections.singletonList( - new OffsetCommitRequestPartition() - .setPartitionIndex(partitionTwo) - .setCommittedOffset(offset) - .setCommittedLeaderEpoch(leaderEpoch) - .setCommittedMetadata(metadata) - )) - ); + requestPartitionOne = new OffsetCommitRequestPartition() + .setPartitionIndex(partitionOne) + .setCommittedOffset(offset) + .setCommittedLeaderEpoch(leaderEpoch) + .setCommittedMetadata(metadata); + + requestPartitionTwo = new OffsetCommitRequestPartition() + .setPartitionIndex(partitionTwo) + .setCommittedOffset(offset) + .setCommittedLeaderEpoch(leaderEpoch) + .setCommittedMetadata(metadata); + data = new OffsetCommitRequestData() - .setGroupId(groupId) - .setTopics(topics); + .setGroupId(groupId) + .setTopics(Arrays.asList( + new OffsetCommitRequestTopic() + .setTopicId(topicOneId) + .setName(topicOne) + .setPartitions(Collections.singletonList(requestPartitionOne)), + new OffsetCommitRequestTopic() + .setTopicId(topicTwoId) + .setName(topicTwo) + .setPartitions(Collections.singletonList(requestPartitionTwo)) + )); } - @Test - public void testConstructor() { - Map<TopicPartition, Long> expectedOffsets = new HashMap<>(); - expectedOffsets.put(new TopicPartition(topicOne, partitionOne), offset); - expectedOffsets.put(new TopicPartition(topicTwo, partitionTwo), offset); + public static Map<TopicPartition, Long> offsets( + OffsetCommitRequest request, + TopicIdAndNameBiMap topicIdAndNames + ) { + Map<TopicPartition, Long> offsets = new HashMap<>(); + for (OffsetCommitRequestTopic topic : request.data().topics()) { + String topicName = topic.name(); - OffsetCommitRequest.Builder builder = new OffsetCommitRequest.Builder(data); + if (request.version() >= 9) { + topicName = topicIdAndNames.getTopicName(topic.topicId()).orElseThrow( + () -> new UnknownTopicIdException("Topic with ID " + topic.topicId() + " not found.")); + } - for (short version : ApiKeys.TXN_OFFSET_COMMIT.allVersions()) { - OffsetCommitRequest request = builder.build(version); - assertEquals(expectedOffsets, request.offsets()); + for (OffsetCommitRequestData.OffsetCommitRequestPartition partition : topic.partitions()) { + offsets.put(new TopicPartition(topicName, partition.partitionIndex()), partition.committedOffset()); + } + } + return offsets; + } - OffsetCommitResponse response = request.getErrorResponse(throttleTimeMs, Errors.NOT_COORDINATOR.exception()); + @ParameterizedTest + @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT) + public void testConstructor(short version) { + OffsetCommitRequest request = new OffsetCommitRequest.Builder(data, true).build(version); + OffsetCommitResponse response = request.getErrorResponse(throttleTimeMs, Errors.NOT_COORDINATOR.exception()); - assertEquals(Collections.singletonMap(Errors.NOT_COORDINATOR, 2), response.errorCounts()); - assertEquals(throttleTimeMs, response.throttleTimeMs()); - } + assertEquals(data, request.data()); + assertEquals(Collections.singletonMap(Errors.NOT_COORDINATOR, 2), response.errorCounts()); + assertEquals(throttleTimeMs, response.throttleTimeMs()); } @Test public void testGetErrorResponseTopics() { List<OffsetCommitResponseTopic> expectedTopics = Arrays.asList( new OffsetCommitResponseTopic() .setName(topicOne) + .setTopicId(topicOneId) .setPartitions(Collections.singletonList( new OffsetCommitResponsePartition() .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()) .setPartitionIndex(partitionOne))), new OffsetCommitResponseTopic() .setName(topicTwo) + .setTopicId(topicTwoId) .setPartitions(Collections.singletonList( new OffsetCommitResponsePartition() .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()) .setPartitionIndex(partitionTwo))) ); - assertEquals(expectedTopics, getErrorResponseTopics(topics, Errors.UNKNOWN_MEMBER_ID)); + assertEquals(expectedTopics, getErrorResponseTopics(data.topics(), Errors.UNKNOWN_MEMBER_ID)); } - @Test - public void testVersionSupportForGroupInstanceId() { + @ParameterizedTest + @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT) + public void testVersionSupportForGroupInstanceId(short version) { OffsetCommitRequest.Builder builder = new OffsetCommitRequest.Builder( new OffsetCommitRequestData() .setGroupId(groupId) .setMemberId(memberId) - .setGroupInstanceId(groupInstanceId) + .setGroupInstanceId(groupInstanceId), + true ); - for (short version : ApiKeys.OFFSET_COMMIT.allVersions()) { - if (version >= 7) { - builder.build(version); - } else { - final short finalVersion = version; - assertThrows(UnsupportedVersionException.class, () -> builder.build(finalVersion)); - } + if (version >= 7) { + builder.build(version); + } else { + final short finalVersion = version; + assertThrows(UnsupportedVersionException.class, () -> builder.build(finalVersion)); + } + } + + @ParameterizedTest + @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT) + public void testResolvesTopicNameIfRequiredWhenListingOffsets(short version) { + OffsetCommitRequest request = new OffsetCommitRequest.Builder(data, true).build(version); + List<OffsetCommitRequestTopic> topics = request.data().topics(); + + assertEquals(2, topics.stream().flatMap(t -> t.partitions().stream()).count()); + assertEquals(requestPartitionOne, topics.get(0).partitions().get(0)); + assertEquals(requestPartitionTwo, topics.get(1).partitions().get(0)); + } + + @Test + public void testUnresolvableTopicIdWhenListingOffset() { + OffsetCommitRequest request = new OffsetCommitRequest.Builder(data.duplicate(), true).build((short) 9); + assertThrows(UnknownTopicIdException.class, + () -> OffsetCommitRequestTest.offsets(request, TopicIdAndNameBiMap.emptyMapping())); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void maxAllowedVersionIsEightIfRequestCannotUseTopicIds(boolean canUseTopicIds) { + OffsetCommitRequest.Builder builder = new OffsetCommitRequest.Builder(data.duplicate(), canUseTopicIds); + assertEquals(canUseTopicIds ? 9 : 8, builder.build(builder.latestAllowedVersion()).version()); + } + + /** + * Compares the two {@link OffsetCommitRequest} independently of the order in which the + * {@link OffsetCommitRequestTopic} and {@link OffsetCommitRequestPartition} are defined in the response. + */ + public static void assertRequestEquals(OffsetCommitRequest expectedRequest, OffsetCommitRequest actualRequest) { + if (expectedRequest.version() > 9 || actualRequest.version() > 9) { + throw new AssertionError("A new version of OffsetCommitRequest has been detected. Please " + + "review the equality contract enforced here and add/remove fields accordingly."); } + + OffsetCommitRequestData expected = expectedRequest.data(); + OffsetCommitRequestData actual = actualRequest.data(); + + assertEquals(expectedRequest.version(), actualRequest.version()); + assertEquals(expected.groupId(), actual.groupId(), "Group id mismatch"); + assertEquals(expected.groupInstanceId(), actual.groupInstanceId(), "Group instance id mismatch"); + assertEquals(expected.generationId(), actual.generationId(), "Generation id mismatch"); + assertEquals(expected.memberId(), actual.memberId(), "Member id mismatch"); + assertEquals(expected.retentionTimeMs(), actual.retentionTimeMs(), "Retention time mismatch"); + + Function<OffsetCommitRequestTopic, List<OffsetCommitRequestPartition>> partitionSelector = + OffsetCommitRequestTopic::partitions; + + Function<OffsetCommitRequestTopic, NameAndId> topicClassifier = + topic -> new NameAndId(topic.name(), topic.topicId()); + + BiFunction<NameAndId, OffsetCommitRequestPartition, TopicIdPartition> partitionClassifier = + (nameAndId, p) -> new TopicIdPartition(nameAndId.id(), p.partitionIndex(), nameAndId.name()); + + Function<OffsetCommitRequestData, Map<TopicIdPartition, OffsetCommitRequestPartition>> partitioner = + request -> partition(request.topics(), partitionSelector, topicClassifier, partitionClassifier); + + assertEquals(partitioner.apply(expected), partitioner.apply(actual)); + } + + /** + * Compares the two {@link OffsetCommitResponse} independently of the order in which the + * {@link OffsetCommitResponseTopic} and {@link OffsetCommitResponsePartition} are defined in the response. + */ + public static void assertResponseEquals(OffsetCommitResponse expected, OffsetCommitResponse actual) { + assertEquals(expected.throttleTimeMs(), actual.throttleTimeMs()); + assertEquals(expected.errorCounts(), actual.errorCounts()); + + Function<OffsetCommitResponseTopic, List<OffsetCommitResponsePartition>> partitionSelector = + OffsetCommitResponseTopic::partitions; + + Function<OffsetCommitResponseTopic, NameAndId> topicClassifier = + topic -> new NameAndId(topic.name(), topic.topicId()); + + BiFunction<NameAndId, OffsetCommitResponsePartition, TopicIdPartition> partitionClassifier = + (nameAndId, p) -> new TopicIdPartition(nameAndId.id(), p.partitionIndex(), nameAndId.name()); + + Function<OffsetCommitResponse, Map<TopicIdPartition, OffsetCommitResponsePartition>> partitioner = + response -> partition(response.data().topics(), partitionSelector, topicClassifier, partitionClassifier); + + assertEquals(partitioner.apply(expected), partitioner.apply(actual)); + } + + private static <T, P> Map<TopicIdPartition, P> partition( Review Comment: This seems to be a quite complicated way to group `OffsetCommitRequestPartition` or `OffsetCommitResponsePartition` by `TopicIdPartition`, no? I would just write two methods to do just this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org