dajac commented on code in PR #13240: URL: https://github.com/apache/kafka/pull/13240#discussion_r1113132735
########## clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java: ########## @@ -59,7 +63,36 @@ public OffsetCommitRequest build(short version) { throw new UnsupportedVersionException("The broker offset commit protocol version " + version + " does not support usage of config group.instance.id."); } - return new OffsetCommitRequest(data, version); + + // Copy since we can mutate it. + OffsetCommitRequestData requestData = data.duplicate(); Review Comment: nit: We probably don't need to duplicate `data` here. I understand why you are doing it but in practice we assume that `data` is owned by the builder once it is given to it. ########## clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java: ########## @@ -78,11 +111,18 @@ public OffsetCommitRequestData data() { return data; } - public Map<TopicPartition, Long> offsets() { + public Map<TopicPartition, Long> offsets(TopicResolver topicResolver) { Review Comment: I just realized that this is only used in tests. I wonder if we should just get rid of it and use the auto-generated classes in tests as well. ########## clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java: ########## @@ -59,7 +63,36 @@ public OffsetCommitRequest build(short version) { throw new UnsupportedVersionException("The broker offset commit protocol version " + version + " does not support usage of config group.instance.id."); } - return new OffsetCommitRequest(data, version); + + // Copy since we can mutate it. + OffsetCommitRequestData requestData = data.duplicate(); + + if (version >= 9) { + requestData.topics().forEach(topic -> { + // Set the topic name to null if a topic ID for the topic is present. If no topic ID is + // provided (i.e. its value is ZERO_UUID), the client should provide a topic name as a + // fallback. This allows the OffsetCommit API to support both topic IDs and topic names + // inside the same request or response. + if (!Uuid.ZERO_UUID.equals(topic.topicId())) { + topic.setName(null); + } else if (topic.name() == null || "".equals(topic.name())) { + // Fail-fast the entire request. This means that a single invalid topic in a multi-topic + // request will make it fail. We may want to relax the constraint to allow the request + // with valid topics (i.e. for which a valid ID or name was provided) exist in the request. + throw new UnknownTopicOrPartitionException( Review Comment: nit: InvalidRequestException would be more appropriate. ########## clients/src/test/java/org/apache/kafka/common/requests/OffsetCommitRequestTest.java: ########## @@ -139,4 +151,65 @@ public void testVersionSupportForGroupInstanceId() { } } } + + @Test + public void testHandlingOfTopicIdInAllVersions() { + for (short version : ApiKeys.OFFSET_COMMIT.allVersions()) { + OffsetCommitRequest request = new OffsetCommitRequest.Builder(data).build(version); + List<OffsetCommitRequestTopic> requestTopics = request.data().topics(); + + if (version >= 9) { + // Version >= 9: + // Topic ID may be present or not. Both are valid cases. If no topic ID is provided (null or + // set to ZERO_UUID), a topic name must be provided and will be used. If a topic ID is provided, + // the name will be nullified. + assertNull(requestTopics.get(0).name()); + assertEquals(topicOneId, requestTopics.get(0).topicId()); + + assertEquals(topicTwo, requestTopics.get(1).name()); + assertEquals(Uuid.ZERO_UUID, requestTopics.get(1).topicId()); + + } else { + // Version < 9: + // Topic ID may be present or not. They are set to ZERO_UUID in the finalized request. Any other + // value would make serialization of the request fail. + assertEquals(topicOne, requestTopics.get(0).name()); + assertEquals(Uuid.ZERO_UUID, requestTopics.get(0).topicId()); + + assertEquals(topicTwo, requestTopics.get(1).name()); + assertEquals(Uuid.ZERO_UUID, requestTopics.get(1).topicId()); + } + } + } + + @Test + public void testTopicIdMustBeSetIfNoTopicNameIsProvided() { + OffsetCommitRequestTopic topic = new OffsetCommitRequestTopic() + .setPartitions(Collections.singletonList(requestPartitionOne)); + OffsetCommitRequestData data = new OffsetCommitRequestData() + .setGroupId(groupId) + .setTopics(Collections.singletonList(topic)); + + assertThrows(UnknownTopicOrPartitionException.class, () -> new OffsetCommitRequest.Builder(data).build((short) 9)); + } + + @Test + public void testResolvesTopicNameIfRequiredWhenListingOffsets() { + for (short version : ApiKeys.OFFSET_COMMIT.allVersions()) { Review Comment: nit: ditto. ########## clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java: ########## @@ -151,11 +154,12 @@ public Builder addPartition( public <P> Builder addPartitions( String topicName, + Uuid topicId, List<P> partitions, Function<P, Integer> partitionIndex, Errors error ) { - final OffsetCommitResponseTopic topicResponse = getOrCreateTopic(topicName); + final OffsetCommitResponseTopic topicResponse = getOrCreateTopic(topicName, topicId); Review Comment: I think that there is a bug here for the case where multiple topic ids are unknown in a single request. For those, the topic name will be null so they will be aggregated in the same OffsetCommitResponseTopic and that one will have the topic id of the first unknown topic id seen. ########## clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java: ########## @@ -192,8 +196,27 @@ public Builder merge( return this; } - public OffsetCommitResponse build() { - return new OffsetCommitResponse(data); + public OffsetCommitResponse build(short version) { + // Copy since we can mutate it. + OffsetCommitResponseData responseData = data.duplicate(); + + if (version >= 9) { + responseData.topics().forEach(topic -> { + // Set the topic name to null if a topic ID for the topic is present. + if (!Uuid.ZERO_UUID.equals(topic.topicId())) { + topic.setName(null); + } + }); + } else { + responseData.topics().forEach(topic -> { + // Topic must be set to default for version < 9. + if (!Uuid.ZERO_UUID.equals(topic.topicId())) { + topic.setTopicId(Uuid.ZERO_UUID); + } + // Topic name must not be null. Validity will be checked at serialization time. Review Comment: nit: I think that we could remove this comment. It does not bring much. ########## clients/src/main/resources/common/message/OffsetCommitResponse.json: ########## @@ -28,15 +28,19 @@ // Version 7 offsetCommitRequest supports a new field called groupInstanceId to indicate member identity across restarts. // // Version 8 is the first flexible version. - "validVersions": "0-8", + // + // Version 9 adds TopicId field (KIP-848). Review Comment: The KIP also specifies new errors for this version. Could we mention them here? ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java: ########## @@ -1369,7 +1387,7 @@ public void testJoinPrepareWithDisableAutoCommit() { try (ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, false, Optional.of("group-id"), true)) { coordinator.ensureActiveGroup(); - prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.NONE); + prepareOffsetCommitRequest(new OffsetCommitResponseSpec().expectedOffsets(singletonMap(t1p, 100L))); Review Comment: What's the reason for this change? If we refactor this, it may be better to directly go with the auto-generated data structures. ########## core/src/main/scala/kafka/server/KafkaApis.scala: ########## @@ -425,35 +425,73 @@ class KafkaApis(val requestChannel: RequestChannel, requestHelper.sendMaybeThrottle(request, offsetCommitRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) CompletableFuture.completedFuture[Unit](()) } else { + val topicNames = + if (offsetCommitRequest.version() >= 9) + metadataCache.topicIdsToNames() + else + Collections.emptyMap[Uuid, String]() + + // For version < 9, lookup from topicNames fails and the topic name (which cannot be null) is returned. + // For version >= 9, if lookup from topicNames fails, there are two possibilities: + // + // a) The topic ID was left to default and the topic name should have been populated as a fallback instead. + // If none was provided, null is returned. + // + // b) The topic ID was not default but is not present in the local topic IDs cache. In this case, because + // clients should make exclusive use of topic name or topic ID, the topic name should be null. If however + // the client provided a topic name, we do not want to use it, because any topic with the same name + // present locally would then have a topic ID which does not match the topic ID in the request. + def resolveTopicName(topic: OffsetCommitRequestData.OffsetCommitRequestTopic): Option[String] = { + val resolvedFromId = topicNames.get(topic.topicId()) + if (resolvedFromId != null) + Some(resolvedFromId) + else if (offsetCommitRequest.version() < 9 || Uuid.ZERO_UUID.equals(topic.topicId)) { + Option(topic.name()) + } else { + None + } + } + + offsetCommitRequest.data.topics.forEach { topic => resolveTopicName(topic).foreach(topic.setName _) } Review Comment: I would prefer to inline `resolveTopicName` and avoid allocating an `Option` which does not bring much here. In the mean time, I would directly construct the list of topic names for the authorizer at L461. This way, we could save re-iterating over the topics and the `filter`. What do you think? Moreover, the KIP states that an `INVALID_REQUEST` should be return if both a topic id and a topic name are provided. We could also handle this here. ########## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ########## @@ -1351,8 +1351,11 @@ class KafkaApisTest { @Test def testHandleOffsetCommitRequestTopicsAndPartitionsValidation(): Unit = { Review Comment: It would be great if we could extend the tests here. I think that we need to use multiple unresolvable topic ids in the same request and also check the different versions. I am not sure if we could extend this one or if we should add other ones. ########## clients/src/main/resources/common/message/OffsetCommitRequest.json: ########## @@ -47,8 +49,10 @@ "about": "The time period in ms to retain the offset." }, { "name": "Topics", "type": "[]OffsetCommitRequestTopic", "versions": "0+", "about": "The topics to commit offsets for.", "fields": [ - { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName", + { "name": "Name", "type": "string", "versions": "0+", "nullableVersions": "9+", "entityType": "topicName", "about": "The topic name." }, + { "name": "TopicId", "type": "uuid", "versions": "9+", + "about": "The unique topic ID" }, Review Comment: nit: Could we add `.` at the end? ########## core/src/main/scala/kafka/server/KafkaApis.scala: ########## @@ -425,35 +425,72 @@ class KafkaApis(val requestChannel: RequestChannel, requestHelper.sendMaybeThrottle(request, offsetCommitRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) CompletableFuture.completedFuture[Unit](()) } else { + val topicNames = + if (offsetCommitRequest.version() >= 9) + metadataCache.topicIdsToNames() + else + Collections.emptyMap[Uuid, String]() + + // For version < 9, lookup from topicNames fails and the topic name (which cannot be null) is returned. + // For version >= 9, if lookup from topicNames fails, there are two possibilities: + // + // a) The topic ID was left to default and the topic name should have been populated as a fallback instead. + // If none was provided, null is returned. + // + // b) The topic ID was not default but is not present in the local topic IDs cache. In this case, because + // clients should make exclusive use of topic name or topic ID, the topic name should be null. If however + // the client provided a topic name, we do not want to use it, because any topic with the same name + // present locally would then have a topic ID which does not match the topic ID in the request. + def resolveTopicName(topic: OffsetCommitRequestData.OffsetCommitRequestTopic): String = { + val resolvedFromId = topicNames.get(topic.topicId()) + if (resolvedFromId != null) + resolvedFromId + else if (offsetCommitRequest.version() < 9 || Uuid.ZERO_UUID.equals(topic.topicId)) { + topic.name() + } else { + null + } + } + val authorizedTopics = authHelper.filterByAuthorized( request.context, READ, TOPIC, - offsetCommitRequest.data.topics.asScala - )(_.name) + offsetCommitRequest.data.topics.asScala.filter(topic => resolveTopicName(topic) != null) + )(resolveTopicName) val responseBuilder = new OffsetCommitResponse.Builder() val authorizedTopicsRequest = new mutable.ArrayBuffer[OffsetCommitRequestData.OffsetCommitRequestTopic]() offsetCommitRequest.data.topics.forEach { topic => - if (!authorizedTopics.contains(topic.name)) { + val topicName = resolveTopicName(topic) + if (topicName == null) { + // Topic name cannot be null for version < 9. From version >= 9, topicName is null iff it cannot + // be resolved from the local topic IDs cache or topic ID was left to default but no fallback topic + // name was provided. + responseBuilder.addPartitions[OffsetCommitRequestData.OffsetCommitRequestPartition]( Review Comment: This issue is still present. Yeah, we definitely need to update the response builder to support this. One way would be to change the semantic of `addPartitions` to directly add to the response when it is called and to only put the topic in the HashMap when `addPartition` is used. ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java: ########## @@ -2604,11 +2650,95 @@ public void testCommitOffsetUnknownMemberId() { client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); - prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.UNKNOWN_MEMBER_ID); + prepareOffsetCommitRequest(new OffsetCommitResponseSpec() + .expectedOffsets(singletonMap(t1p, 100L)) + .error(Errors.UNKNOWN_MEMBER_ID)); + assertThrows(CommitFailedException.class, () -> coordinator.commitOffsetsSync(singletonMap(t1p, new OffsetAndMetadata(100L, "metadata")), time.timer(Long.MAX_VALUE))); } + @Test + public void testCommitOffsetUnknownTopicId() { + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); + coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); + + // Prepare five OffsetCommit requests which return a retriable UNKNOWN_TOPIC_ID error. + // Set the timeout accordingly so that commitOffsetsSync completes on the fifth attempt. + // Note that the timer (MockTime) only ticks when its sleep(long) method is invoked. + // Because the ConsumerNetworkClient does not call sleep() in its network-level poll(.), this + // timer never moves forward once the network client is invoked. If there is no available + // response to consume, its internal poll loop never completes. Hence, the timeout needs to be + // enforced in the ConsumerCoordinator, and we need to make sure there are enough responses + // queued in the MockClient to satisfy all invocations of the ConsumerNetworkClient#poll(.). + int offsetCommitCalls = 5; + long timeoutMs = rebalanceConfig.retryBackoffMs * offsetCommitCalls; + + IntStream.range(0, offsetCommitCalls).forEach(__ -> + prepareOffsetCommitRequest(new OffsetCommitResponseSpec() + .expectedOffsets(singletonMap(t1p, 100L)) + .error(Errors.UNKNOWN_TOPIC_ID))); + + // UnknownTopicIdException is retriable, hence will be retried by the coordinator as long as + // the timeout allows. Note that since topic ids are not part of the public API of the consumer, + // we cannot throw an UnknownTopicId to the user. By design, a false boolean indicating the + // offset commit failed is returned. + assertFalse(coordinator.commitOffsetsSync(singletonMap(t1p, + new OffsetAndMetadata(100L, "metadata")), time.timer(timeoutMs))); + } + + @Test + public void testRetryCommitUnknownTopicId() { + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); + coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); + + client.prepareResponse(offsetCommitResponse(singletonMap(t1p, Errors.UNKNOWN_TOPIC_ID))); + client.prepareResponse(offsetCommitResponse(singletonMap(t1p, Errors.NONE))); + + assertTrue(coordinator.commitOffsetsSync(singletonMap(t1p, + new OffsetAndMetadata(100L, "metadata")), time.timer(Long.MAX_VALUE))); + } + + @Test + public void testTopicIdsArePopulatedByTheConsumerCoordinator() { Review Comment: We also need tests to check if the response is handled correctly. ########## clients/src/test/java/org/apache/kafka/common/requests/OffsetCommitRequestTest.java: ########## @@ -139,4 +151,65 @@ public void testVersionSupportForGroupInstanceId() { } } } + + @Test + public void testHandlingOfTopicIdInAllVersions() { + for (short version : ApiKeys.OFFSET_COMMIT.allVersions()) { Review Comment: nit: You could replace this by the following: ``` @ParameterizedTest @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT) ``` ########## clients/src/test/java/org/apache/kafka/common/requests/OffsetCommitRequestTest.java: ########## @@ -139,4 +151,65 @@ public void testVersionSupportForGroupInstanceId() { } } } + + @Test + public void testHandlingOfTopicIdInAllVersions() { + for (short version : ApiKeys.OFFSET_COMMIT.allVersions()) { + OffsetCommitRequest request = new OffsetCommitRequest.Builder(data).build(version); + List<OffsetCommitRequestTopic> requestTopics = request.data().topics(); + + if (version >= 9) { + // Version >= 9: + // Topic ID may be present or not. Both are valid cases. If no topic ID is provided (null or + // set to ZERO_UUID), a topic name must be provided and will be used. If a topic ID is provided, + // the name will be nullified. + assertNull(requestTopics.get(0).name()); + assertEquals(topicOneId, requestTopics.get(0).topicId()); + + assertEquals(topicTwo, requestTopics.get(1).name()); + assertEquals(Uuid.ZERO_UUID, requestTopics.get(1).topicId()); + Review Comment: nit: We could remove this empty line. ########## clients/src/main/resources/common/message/OffsetCommitResponse.json: ########## @@ -28,15 +28,19 @@ // Version 7 offsetCommitRequest supports a new field called groupInstanceId to indicate member identity across restarts. // // Version 8 is the first flexible version. - "validVersions": "0-8", + // + // Version 9 adds TopicId field (KIP-848). + "validVersions": "0-9", "flexibleVersions": "8+", "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "3+", "ignorable": true, "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, { "name": "Topics", "type": "[]OffsetCommitResponseTopic", "versions": "0+", "about": "The responses for each topic.", "fields": [ - { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName", + { "name": "Name", "type": "string", "versions": "0+", "nullableVersions": "9+", "entityType": "topicName", "about": "The topic name." }, + { "name": "TopicId", "type": "uuid", "versions": "9+", + "about": "The unique topic ID" }, Review Comment: nit: Could we add `.` at the end? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java: ########## @@ -1374,7 +1379,8 @@ public void handle(OffsetCommitResponse commitResponse, RequestFuture<Void> futu } else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) { unauthorizedTopics.add(tp.topic()); } else if (error == Errors.OFFSET_METADATA_TOO_LARGE - || error == Errors.INVALID_COMMIT_OFFSET_SIZE) { + || error == Errors.INVALID_COMMIT_OFFSET_SIZE + || error == Errors.UNKNOWN_TOPIC_ID) { Review Comment: At L1361 in this file, we construct `TopicPartition` based on the response data but we don't resolve the topic id. I think that we should add the resolution there as well, no? We probably need to extend tests to better cover this as well. Regarding `UNKNOWN_TOPIC_ID`, would it make sense to place it after `UNKNOWN_TOPIC_OR_PARTITION` as they are quite similar? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org