[GitHub] [kafka] rajinisivaram commented on a change in pull request #10962: KIP-709: Implement request/response for offsetFetch batching
rajinisivaram commented on a change in pull request #10962: URL: https://github.com/apache/kafka/pull/10962#discussion_r664918688 ## File path: core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala ## @@ -1358,17 +1367,222 @@ class AuthorizerIntegrationTest extends BaseRequestTest { // note there's only one broker, so no need to lookup the group coordinator // without describe permission on the topic, we shouldn't be able to fetch offsets -val offsetFetchRequest = new requests.OffsetFetchRequest.Builder(group, false, null, false).build() +val offsetFetchRequest = createOffsetFetchRequestAllPartitions var offsetFetchResponse = connectAndReceive[OffsetFetchResponse](offsetFetchRequest) -assertEquals(Errors.NONE, offsetFetchResponse.error) -assertTrue(offsetFetchResponse.responseData.isEmpty) +assertEquals(Errors.NONE, offsetFetchResponse.groupLevelError(group)) +assertTrue(offsetFetchResponse.partitionDataMap(group).isEmpty) // now add describe permission on the topic and verify that the offset can be fetched addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, DESCRIBE, ALLOW)), topicResource) offsetFetchResponse = connectAndReceive[OffsetFetchResponse](offsetFetchRequest) -assertEquals(Errors.NONE, offsetFetchResponse.error) -assertTrue(offsetFetchResponse.responseData.containsKey(tp)) -assertEquals(offset, offsetFetchResponse.responseData.get(tp).offset) +assertEquals(Errors.NONE, offsetFetchResponse.groupLevelError(group)) +assertTrue(offsetFetchResponse.partitionDataMap(group).containsKey(tp)) +assertEquals(offset, offsetFetchResponse.partitionDataMap(group).get(tp).offset) + } + + @Test + def testOffsetFetchMultipleGroupsAuthorization(): Unit = { +val groups = (0 until 5).map(i => s"group$i") +val groupResources = groups.map(group => new ResourcePattern(GROUP, group, LITERAL)) + +val topic1 = "topic1" +val topic1List = singletonList(new TopicPartition(topic1, 0)) +val topicOneResource = new ResourcePattern(TOPIC, topic1, LITERAL) +val topic2 = "topic2" +val topic1And2List = util.Arrays.asList( + new TopicPartition(topic1, 0), + new TopicPartition(topic2, 0), + new TopicPartition(topic2, 1)) +val topicTwoResource = new ResourcePattern(TOPIC, topic2, LITERAL) +val topic3 = "topic3" +val allTopicsList = util.Arrays.asList( + new TopicPartition(topic1, 0), + new TopicPartition(topic2, 0), + new TopicPartition(topic2, 1), + new TopicPartition(topic3, 0), + new TopicPartition(topic3, 1), + new TopicPartition(topic3, 2)) +val topicThreeResource = new ResourcePattern(TOPIC, topic3, LITERAL) + +// create group to partition map to build batched offsetFetch request +val groupToPartitionMap = new util.HashMap[String, util.List[TopicPartition]]() +groupToPartitionMap.put(groups(1), topic1List) Review comment: groups(0) to group(4) ## File path: core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala ## @@ -0,0 +1,237 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import kafka.utils.TestUtils +import org.apache.kafka.clients.consumer.{ConsumerConfig, OffsetAndMetadata} +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.protocol.{ApiKeys, Errors} +import org.apache.kafka.common.requests.OffsetFetchResponse.PartitionData +import org.apache.kafka.common.requests.{AbstractResponse, OffsetFetchRequest, OffsetFetchResponse} +import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} +import org.junit.jupiter.api.{BeforeEach, Test} + +import java.util +import java.util.Collections.singletonList +import scala.jdk.CollectionConverters._ +import java.util.{Optional, Properties} + +class OffsetFetchRequestTest extends BaseRequestTest { + + override def brokerCount: Int = 1 + + val brokerId: Integer = 0 + val offset = 15L + val leaderEpoch: Optional[Integer] = Optional.of(3) + val metadata = "metadata" + val topic = "topic" + val groupId = "groupId" + val groups: Seq[String] = (0 until 5).map(i =>
[GitHub] [kafka] rajinisivaram commented on a change in pull request #10962: KIP-709: Implement request/response for offsetFetch batching
rajinisivaram commented on a change in pull request #10962: URL: https://github.com/apache/kafka/pull/10962#discussion_r664880346 ## File path: clients/src/main/resources/common/message/OffsetFetchResponse.json ## @@ -30,30 +30,57 @@ // Version 6 is the first flexible version. // // Version 7 adds pending offset commit as new error response on partition level. - "validVersions": "0-7", + // + // Version 8 is adding support for fetching offsets for multiple groups + "validVersions": "0-8", "flexibleVersions": "6+", "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "3+", "ignorable": true, "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, -{ "name": "Topics", "type": "[]OffsetFetchResponseTopic", "versions": "0+", +{ "name": "Topics", "type": "[]OffsetFetchResponseTopic", "versions": "0-7", "about": "The responses per topic.", "fields": [ - { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName", + { "name": "Name", "type": "string", "versions": "0-7", "entityType": "topicName", "about": "The topic name." }, - { "name": "Partitions", "type": "[]OffsetFetchResponsePartition", "versions": "0+", + { "name": "Partitions", "type": "[]OffsetFetchResponsePartition", "versions": "0-7", "about": "The responses per partition", "fields": [ -{ "name": "PartitionIndex", "type": "int32", "versions": "0+", +{ "name": "PartitionIndex", "type": "int32", "versions": "0-7", "about": "The partition index." }, -{ "name": "CommittedOffset", "type": "int64", "versions": "0+", +{ "name": "CommittedOffset", "type": "int64", "versions": "0-7", "about": "The committed message offset." }, -{ "name": "CommittedLeaderEpoch", "type": "int32", "versions": "5+", "default": "-1", +{ "name": "CommittedLeaderEpoch", "type": "int32", "versions": "5-7", "default": "-1", "ignorable": true, "about": "The leader epoch." }, -{ "name": "Metadata", "type": "string", "versions": "0+", "nullableVersions": "0+", +{ "name": "Metadata", "type": "string", "versions": "0-7", "nullableVersions": "0-7", "about": "The partition metadata." }, -{ "name": "ErrorCode", "type": "int16", "versions": "0+", +{ "name": "ErrorCode", "type": "int16", "versions": "0-7", "about": "The error code, or 0 if there was no error." } ]} ]}, -{ "name": "ErrorCode", "type": "int16", "versions": "2+", "default": "0", "ignorable": true, - "about": "The top-level error code, or 0 if there was no error." } +{ "name": "ErrorCode", "type": "int16", "versions": "2-7", "default": "0", "ignorable": true, + "about": "The top-level error code, or 0 if there was no error." }, +{"name": "GroupIds", "type": "[]OffsetFetchResponseGroup", "versions": "8+", Review comment: As with the response, should we call this `Groups` rather than `GroupIds`? ## File path: clients/src/main/resources/common/message/OffsetFetchRequest.json ## @@ -31,19 +31,33 @@ // Version 6 is the first flexible version. // // Version 7 is adding the require stable flag. - "validVersions": "0-7", + // + // Version 8 is adding support for fetching offsets for multiple groups at a time + "validVersions": "0-8", "flexibleVersions": "6+", "fields": [ -{ "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId", +{ "name": "GroupId", "type": "string", "versions": "0-7", "entityType": "groupId", "about": "The group to fetch offsets for." }, -{ "name": "Topics", "type": "[]OffsetFetchRequestTopic", "versions": "0+", "nullableVersions": "2+", +{ "name": "Topics", "type": "[]OffsetFetchRequestTopic", "versions": "0-7", "nullableVersions": "2-7", "about": "Each topic we would like to fetch offsets for, or null to fetch offsets for all topics.", "fields": [ - { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName", + { "name": "Name", "type": "string", "versions": "0-7", "entityType": "topicName", "about": "The topic name."}, - { "name": "PartitionIndexes", "type": "[]int32", "versions": "0+", + { "name": "PartitionIndexes", "type": "[]int32", "versions": "0-7", "about": "The partition indexes we would like to fetch offsets for." } ]}, +{ "name": "GroupIds", "type": "[]OffsetFetchRequestGroup", "versions": "8+", Review comment: Should we call this `Groups` rather than `GroupIds` since it is not just the group id? ## File path: core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala ## @@ -1358,17 +1367,233 @@ class AuthorizerIntegrationTest extends BaseRequestTest { // note there's
[GitHub] [kafka] rajinisivaram commented on a change in pull request #10962: KIP-709: Implement request/response for offsetFetch batching
rajinisivaram commented on a change in pull request #10962: URL: https://github.com/apache/kafka/pull/10962#discussion_r664873858 ## File path: clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java ## @@ -174,6 +315,28 @@ public boolean isAllPartitions() { return data.topics() == ALL_TOPIC_PARTITIONS; } +public boolean isAllPartitionsForGroup(String groupId) { +OffsetFetchRequestGroup group = data +.groupIds() +.stream() +.filter(g -> g.groupId().equals(groupId)) +.collect(toSingleton()); +return group.topics() == ALL_TOPIC_PARTITIONS_BATCH; +} + +// Custom collector to filter a single element +private Collector toSingleton() { Review comment: You can probably set the list in `OffsetFetchRequestData` directly in the test. But let's leave that for a follow-on PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram commented on a change in pull request #10962: KIP-709: Implement request/response for offsetFetch batching
rajinisivaram commented on a change in pull request #10962: URL: https://github.com/apache/kafka/pull/10962#discussion_r664806203 ## File path: clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java ## @@ -174,6 +315,28 @@ public boolean isAllPartitions() { return data.topics() == ALL_TOPIC_PARTITIONS; } +public boolean isAllPartitionsForGroup(String groupId) { +OffsetFetchRequestGroup group = data +.groupIds() +.stream() +.filter(g -> g.groupId().equals(groupId)) +.collect(toSingleton()); +return group.topics() == ALL_TOPIC_PARTITIONS_BATCH; +} + +// Custom collector to filter a single element +private Collector toSingleton() { Review comment: Same test class `OffsetFetchRequestTest.scala`, a test with v8 with a batched request where the same group appears twice, perhaps with different topics. The response should be either InvalidRequestException because we want to treat it as an error OR actual offsets because we handle the request correctly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram commented on a change in pull request #10962: KIP-709: Implement request/response for offsetFetch batching
rajinisivaram commented on a change in pull request #10962: URL: https://github.com/apache/kafka/pull/10962#discussion_r664804113 ## File path: clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java ## @@ -174,6 +315,28 @@ public boolean isAllPartitions() { return data.topics() == ALL_TOPIC_PARTITIONS; } +public boolean isAllPartitionsForGroup(String groupId) { +OffsetFetchRequestGroup group = data +.groupIds() +.stream() +.filter(g -> g.groupId().equals(groupId)) +.collect(toSingleton()); Review comment: The helper method `toSingleton()` throws IllegalStateException if the list size is greater than one. If a request contains the same group twice, it can appear twice in the list. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram commented on a change in pull request #10962: KIP-709: Implement request/response for offsetFetch batching
rajinisivaram commented on a change in pull request #10962: URL: https://github.com/apache/kafka/pull/10962#discussion_r664798710 ## File path: clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java ## @@ -174,6 +315,28 @@ public boolean isAllPartitions() { return data.topics() == ALL_TOPIC_PARTITIONS; } +public boolean isAllPartitionsForGroup(String groupId) { +OffsetFetchRequestGroup group = data +.groupIds() +.stream() +.filter(g -> g.groupId().equals(groupId)) +.collect(toSingleton()); +return group.topics() == ALL_TOPIC_PARTITIONS_BATCH; +} + +// Custom collector to filter a single element +private Collector toSingleton() { Review comment: Can we a add a test in the new `kafka.server.OffsetFetchRequestTest`? Can do that in a separate PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram commented on a change in pull request #10962: KIP-709: Implement request/response for offsetFetch batching
rajinisivaram commented on a change in pull request #10962: URL: https://github.com/apache/kafka/pull/10962#discussion_r664796679 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java ## @@ -1308,29 +1308,31 @@ private OffsetFetchResponseHandler() { @Override public void handle(OffsetFetchResponse response, RequestFuture> future) { -if (response.hasError()) { -Errors error = response.error(); -log.debug("Offset fetch failed: {}", error.message()); +Errors responseError = response.groupLevelError(rebalanceConfig.groupId); +if (responseError != Errors.NONE) { +log.debug("Offset fetch failed: {}", responseError.message()); -if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS) { +if (responseError == Errors.COORDINATOR_LOAD_IN_PROGRESS) { // just retry -future.raise(error); -} else if (error == Errors.NOT_COORDINATOR) { +future.raise(responseError); +} else if (responseError == Errors.NOT_COORDINATOR) { // re-discover the coordinator and retry -markCoordinatorUnknown(error); -future.raise(error); -} else if (error == Errors.GROUP_AUTHORIZATION_FAILED) { +markCoordinatorUnknown(responseError); +future.raise(responseError); +} else if (responseError == Errors.GROUP_AUTHORIZATION_FAILED) { future.raise(GroupAuthorizationException.forGroupId(rebalanceConfig.groupId)); } else { -future.raise(new KafkaException("Unexpected error in fetch offset response: " + error.message())); +future.raise(new KafkaException("Unexpected error in fetch offset response: " + responseError.message())); } return; } Set unauthorizedTopics = null; -Map offsets = new HashMap<>(response.responseData().size()); +Map responseData = Review comment: ok, let's leave as is. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram commented on a change in pull request #10962: KIP-709: Implement request/response for offsetFetch batching
rajinisivaram commented on a change in pull request #10962: URL: https://github.com/apache/kafka/pull/10962#discussion_r664795285 ## File path: clients/src/test/java/org/apache/kafka/common/requests/OffsetFetchRequestTest.java ## @@ -76,62 +73,169 @@ public void testConstructor() { } for (short version : ApiKeys.OFFSET_FETCH.allVersions()) { -OffsetFetchRequest request = builder.build(version); -assertFalse(request.isAllPartitions()); -assertEquals(groupId, request.groupId()); -assertEquals(partitions, request.partitions()); - -OffsetFetchResponse response = request.getErrorResponse(throttleTimeMs, Errors.NONE); -assertEquals(Errors.NONE, response.error()); -assertFalse(response.hasError()); -assertEquals(Collections.singletonMap(Errors.NONE, version <= (short) 1 ? 3 : 1), response.errorCounts(), -"Incorrect error count for version " + version); - -if (version <= 1) { -assertEquals(expectedData, response.responseData()); +if (version < 8) { +builder = new OffsetFetchRequest.Builder( +group1, +false, +partitions, +false); +assertFalse(builder.isAllTopicPartitions()); +OffsetFetchRequest request = builder.build(version); +assertFalse(request.isAllPartitions()); +assertEquals(group1, request.groupId()); +assertEquals(partitions, request.partitions()); + +OffsetFetchResponse response = request.getErrorResponse(throttleTimeMs, Errors.NONE); +assertEquals(Errors.NONE, response.error()); +assertFalse(response.hasError()); +assertEquals(Collections.singletonMap(Errors.NONE, version <= (short) 1 ? 3 : 1), response.errorCounts(), +"Incorrect error count for version " + version); + +if (version <= 1) { +assertEquals(expectedData, response.responseDataV0ToV7()); +} + +if (version >= 3) { +assertEquals(throttleTimeMs, response.throttleTimeMs()); +} else { +assertEquals(DEFAULT_THROTTLE_TIME, response.throttleTimeMs()); +} +} else { +builder = new Builder(Collections.singletonMap(group1, partitions), false, false); +OffsetFetchRequest request = builder.build(version); +Map> groupToPartitionMap = +request.groupIdsToPartitions(); +Map> groupToTopicMap = +request.groupIdsToTopics(); +assertFalse(request.isAllPartitionsForGroup(group1)); +assertTrue(groupToPartitionMap.containsKey(group1) && groupToTopicMap.containsKey( +group1)); +assertEquals(partitions, groupToPartitionMap.get(group1)); +OffsetFetchResponse response = request.getErrorResponse(throttleTimeMs, Errors.NONE); +assertEquals(Errors.NONE, response.groupLevelError(group1)); +assertFalse(response.groupHasError(group1)); +assertEquals(Collections.singletonMap(Errors.NONE, 1), response.errorCounts(), +"Incorrect error count for version " + version); +assertEquals(throttleTimeMs, response.throttleTimeMs()); } +} +} + +@Test +public void testConstructorWithMultipleGroups() { +List topic1Partitions = Arrays.asList( +new TopicPartition(topicOne, partitionOne), +new TopicPartition(topicOne, partitionTwo)); +List topic2Partitions = Arrays.asList( +new TopicPartition(topicTwo, partitionOne), +new TopicPartition(topicTwo, partitionTwo)); +List topic3Partitions = Arrays.asList( +new TopicPartition(topicThree, partitionOne), +new TopicPartition(topicThree, partitionTwo)); +Map> groupToTp = new HashMap<>(); +groupToTp.put(group1, topic1Partitions); +groupToTp.put(group2, topic2Partitions); +groupToTp.put(group3, topic3Partitions); +groupToTp.put(group4, null); +groupToTp.put(group5, null); +int throttleTimeMs = 10; -if (version >= 3) { +for (short version : ApiKeys.OFFSET_FETCH.allVersions()) { +if (version >= 8) { +builder = new Builder(groupToTp, false, false); +OffsetFetchRequest request = builder.build(version); +Map> groupToPartitionMap = +request.groupIdsToPartitions(); +Map> groupToTopicMap = +request.groupIdsToTopics(); +assertEquals(groupToTp.keySet(),
[GitHub] [kafka] rajinisivaram commented on a change in pull request #10962: KIP-709: Implement request/response for offsetFetch batching
rajinisivaram commented on a change in pull request #10962: URL: https://github.com/apache/kafka/pull/10962#discussion_r664793097 ## File path: clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java ## @@ -78,26 +85,107 @@ boolean isAllTopicPartitions() { return this.data.topics() == ALL_TOPIC_PARTITIONS; } +public Builder(Map> groupIdToTopicPartitionMap, +boolean requireStable, +boolean throwOnFetchStableOffsetsUnsupported) { +super(ApiKeys.OFFSET_FETCH); + +List groups = new ArrayList<>(); +for (Entry> entry : groupIdToTopicPartitionMap.entrySet()) { +String groupName = entry.getKey(); +List tpList = entry.getValue(); +final List topics; +if (tpList != null) { +Map offsetFetchRequestTopicMap = +new HashMap<>(); +for (TopicPartition topicPartition : tpList) { +String topicName = topicPartition.topic(); +OffsetFetchRequestTopics topic = offsetFetchRequestTopicMap.getOrDefault( +topicName, new OffsetFetchRequestTopics().setName(topicName)); + topic.partitionIndexes().add(topicPartition.partition()); +offsetFetchRequestTopicMap.put(topicName, topic); +} +topics = new ArrayList<>(offsetFetchRequestTopicMap.values()); +} else { +topics = ALL_TOPIC_PARTITIONS_BATCH; +} +groups.add(new OffsetFetchRequestGroup() +.setGroupId(groupName) +.setTopics(topics)); +} +this.data = new OffsetFetchRequestData() +.setGroupIds(groups) +.setRequireStable(requireStable); +this.throwOnFetchStableOffsetsUnsupported = throwOnFetchStableOffsetsUnsupported; +} + @Override public OffsetFetchRequest build(short version) { if (isAllTopicPartitions() && version < 2) { throw new UnsupportedVersionException("The broker only supports OffsetFetchRequest " + "v" + version + ", but we need v2 or newer to request all topic partitions."); } - +if (data.groupIds().size() > 1 && version < 8) { +throw new NoBatchedOffsetFetchRequestException("Broker does not support" ++ " batching groups for fetch offset request on version " + version); +} if (data.requireStable() && version < 7) { if (throwOnFetchStableOffsetsUnsupported) { throw new UnsupportedVersionException("Broker unexpectedly " + "doesn't support requireStable flag on version " + version); } else { log.trace("Fallback the requireStable flag to false as broker " + - "only supports OffsetFetchRequest version {}. Need " + - "v7 or newer to enable this feature", version); +"only supports OffsetFetchRequest version {}. Need " + +"v7 or newer to enable this feature", version); return new OffsetFetchRequest(data.setRequireStable(false), version); } } - +if (version < 8) { +OffsetFetchRequestData oldDataFormat = null; +if (!data.groupIds().isEmpty()) { +OffsetFetchRequestGroup group = data.groupIds().get(0); +String groupName = group.groupId(); +List topics = group.topics(); +List oldFormatTopics = null; +if (topics != null) { +oldFormatTopics = topics +.stream() +.map(t -> +new OffsetFetchRequestTopic() +.setName(t.name()) +.setPartitionIndexes(t.partitionIndexes())) +.collect(Collectors.toList()); +} +oldDataFormat = new OffsetFetchRequestData() +.setGroupId(groupName) +.setTopics(oldFormatTopics) +.setRequireStable(data.requireStable()); +} +return new OffsetFetchRequest(oldDataFormat == null ? data : oldDataFormat, version); +} +// version 8 but have used old format of request, convert to version 8 of request Review comment: At the moment, the code seems to do: ``` if (version < 8) { do-conversion-if-necessary } //
[GitHub] [kafka] rajinisivaram commented on a change in pull request #10962: KIP-709: Implement request/response for offsetFetch batching
rajinisivaram commented on a change in pull request #10962: URL: https://github.com/apache/kafka/pull/10962#discussion_r664785624 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java ## @@ -1308,29 +1308,31 @@ private OffsetFetchResponseHandler() { @Override public void handle(OffsetFetchResponse response, RequestFuture> future) { -if (response.hasError()) { -Errors error = response.error(); -log.debug("Offset fetch failed: {}", error.message()); +Errors responseError = response.groupLevelError(rebalanceConfig.groupId); Review comment: we can leave as is. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram commented on a change in pull request #10962: KIP-709: Implement request/response for offsetFetch batching
rajinisivaram commented on a change in pull request #10962: URL: https://github.com/apache/kafka/pull/10962#discussion_r663488495 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java ## @@ -1308,29 +1308,31 @@ private OffsetFetchResponseHandler() { @Override public void handle(OffsetFetchResponse response, RequestFuture> future) { -if (response.hasError()) { -Errors error = response.error(); -log.debug("Offset fetch failed: {}", error.message()); +Errors responseError = response.groupLevelError(rebalanceConfig.groupId); Review comment: We could just call this `error` and then won't require the remaining changes below. ## File path: clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java ## @@ -78,26 +85,107 @@ boolean isAllTopicPartitions() { return this.data.topics() == ALL_TOPIC_PARTITIONS; } +public Builder(Map> groupIdToTopicPartitionMap, +boolean requireStable, Review comment: nit: indentation ## File path: clients/src/test/java/org/apache/kafka/common/message/MessageTest.java ## @@ -662,20 +669,179 @@ public void testOffsetFetchVersions() throws Exception { .setErrorCode(Errors.NOT_COORDINATOR.code()) .setThrottleTimeMs(10); for (short version : ApiKeys.OFFSET_FETCH.allVersions()) { -OffsetFetchResponseData responseData = response.get(); -if (version <= 1) { -responseData.setErrorCode(Errors.NONE.code()); +if (version < 8) { Review comment: As before, we should have `if version >= 8` as well. ## File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java ## @@ -87,12 +87,16 @@ public String apiName() { Map failed = new HashMap<>(); List unmapped = new ArrayList<>(); -if (response.error() != Errors.NONE) { -handleError(groupId, response.error(), failed, unmapped); +Errors responseError = response.groupLevelError(groupId.idValue); +if (responseError != Errors.NONE) { +handleError(groupId, responseError, failed, unmapped); } else { final Map groupOffsetsListing = new HashMap<>(); -for (Map.Entry entry : -response.responseData().entrySet()) { +// if entry for group level response data is null, we are getting back an older version +// of the response Review comment: Is this comment required? ## File path: core/src/main/scala/kafka/server/KafkaApis.scala ## @@ -1255,81 +1256,143 @@ class KafkaApis(val requestChannel: RequestChannel, * Handle an offset fetch request */ def handleOffsetFetchRequest(request: RequestChannel.Request): Unit = { +val version = request.header.apiVersion +if (version == 0) { + // reading offsets from ZK + handleOffsetFetchRequestV0(request) +} else if (version >= 1 && version <= 7) { + // reading offsets from Kafka + handleOffsetFetchRequestBetweenV1AndV7(request) +} else { + // batching offset reads for multiple groups starts with version 8 and greater + handleOffsetFetchRequestV8AndAbove(request) +} + } + + private def handleOffsetFetchRequestV0(request: RequestChannel.Request): Unit = { val header = request.header val offsetFetchRequest = request.body[OffsetFetchRequest] -def partitionByAuthorized(seq: Seq[TopicPartition]): (Seq[TopicPartition], Seq[TopicPartition]) = - authHelper.partitionSeqByAuthorized(request.context, DESCRIBE, TOPIC, seq)(_.topic) - def createResponse(requestThrottleMs: Int): AbstractResponse = { val offsetFetchResponse = -// reject the request if not authorized to the group + // reject the request if not authorized to the group Review comment: nit: revert indentation change ## File path: core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala ## @@ -1358,17 +1367,241 @@ class AuthorizerIntegrationTest extends BaseRequestTest { // note there's only one broker, so no need to lookup the group coordinator // without describe permission on the topic, we shouldn't be able to fetch offsets -val offsetFetchRequest = new requests.OffsetFetchRequest.Builder(group, false, null, false).build() +val offsetFetchRequest = createOffsetFetchRequestAllPartitions var offsetFetchResponse = connectAndReceive[OffsetFetchResponse](offsetFetchRequest) -assertEquals(Errors.NONE, offsetFetchResponse.error) -assertTrue(offsetFetchResponse.responseData.isEmpty) +assertEquals(Errors.NONE, offsetFetchResponse.groupLevelError(group)) +
[GitHub] [kafka] rajinisivaram commented on a change in pull request #10962: KIP-709: Implement request/response for offsetFetch batching
rajinisivaram commented on a change in pull request #10962: URL: https://github.com/apache/kafka/pull/10962#discussion_r663488495 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java ## @@ -1308,29 +1308,31 @@ private OffsetFetchResponseHandler() { @Override public void handle(OffsetFetchResponse response, RequestFuture> future) { -if (response.hasError()) { -Errors error = response.error(); -log.debug("Offset fetch failed: {}", error.message()); +Errors responseError = response.groupLevelError(rebalanceConfig.groupId); Review comment: We could just call this `error` and then won't require the remaining changes below. ## File path: clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java ## @@ -78,26 +85,107 @@ boolean isAllTopicPartitions() { return this.data.topics() == ALL_TOPIC_PARTITIONS; } +public Builder(Map> groupIdToTopicPartitionMap, +boolean requireStable, Review comment: nit: indentation ## File path: clients/src/test/java/org/apache/kafka/common/message/MessageTest.java ## @@ -662,20 +669,179 @@ public void testOffsetFetchVersions() throws Exception { .setErrorCode(Errors.NOT_COORDINATOR.code()) .setThrottleTimeMs(10); for (short version : ApiKeys.OFFSET_FETCH.allVersions()) { -OffsetFetchResponseData responseData = response.get(); -if (version <= 1) { -responseData.setErrorCode(Errors.NONE.code()); +if (version < 8) { Review comment: As before, we should have `if version >= 8` as well. ## File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java ## @@ -87,12 +87,16 @@ public String apiName() { Map failed = new HashMap<>(); List unmapped = new ArrayList<>(); -if (response.error() != Errors.NONE) { -handleError(groupId, response.error(), failed, unmapped); +Errors responseError = response.groupLevelError(groupId.idValue); +if (responseError != Errors.NONE) { +handleError(groupId, responseError, failed, unmapped); } else { final Map groupOffsetsListing = new HashMap<>(); -for (Map.Entry entry : -response.responseData().entrySet()) { +// if entry for group level response data is null, we are getting back an older version +// of the response Review comment: Is this comment required? ## File path: core/src/main/scala/kafka/server/KafkaApis.scala ## @@ -1255,81 +1256,143 @@ class KafkaApis(val requestChannel: RequestChannel, * Handle an offset fetch request */ def handleOffsetFetchRequest(request: RequestChannel.Request): Unit = { +val version = request.header.apiVersion +if (version == 0) { + // reading offsets from ZK + handleOffsetFetchRequestV0(request) +} else if (version >= 1 && version <= 7) { + // reading offsets from Kafka + handleOffsetFetchRequestBetweenV1AndV7(request) +} else { + // batching offset reads for multiple groups starts with version 8 and greater + handleOffsetFetchRequestV8AndAbove(request) +} + } + + private def handleOffsetFetchRequestV0(request: RequestChannel.Request): Unit = { val header = request.header val offsetFetchRequest = request.body[OffsetFetchRequest] -def partitionByAuthorized(seq: Seq[TopicPartition]): (Seq[TopicPartition], Seq[TopicPartition]) = - authHelper.partitionSeqByAuthorized(request.context, DESCRIBE, TOPIC, seq)(_.topic) - def createResponse(requestThrottleMs: Int): AbstractResponse = { val offsetFetchResponse = -// reject the request if not authorized to the group + // reject the request if not authorized to the group Review comment: nit: revert indentation change ## File path: core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala ## @@ -1358,17 +1367,241 @@ class AuthorizerIntegrationTest extends BaseRequestTest { // note there's only one broker, so no need to lookup the group coordinator // without describe permission on the topic, we shouldn't be able to fetch offsets -val offsetFetchRequest = new requests.OffsetFetchRequest.Builder(group, false, null, false).build() +val offsetFetchRequest = createOffsetFetchRequestAllPartitions var offsetFetchResponse = connectAndReceive[OffsetFetchResponse](offsetFetchRequest) -assertEquals(Errors.NONE, offsetFetchResponse.error) -assertTrue(offsetFetchResponse.responseData.isEmpty) +assertEquals(Errors.NONE, offsetFetchResponse.groupLevelError(group)) +
[GitHub] [kafka] rajinisivaram commented on a change in pull request #10962: KIP-709: Implement request/response for offsetFetch batching
rajinisivaram commented on a change in pull request #10962: URL: https://github.com/apache/kafka/pull/10962#discussion_r663485656 ## File path: clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java ## @@ -214,6 +321,10 @@ public Errors error() { return responseData; } +public Map responseData(String groupId) { Review comment: Yes, sounds good. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram commented on a change in pull request #10962: KIP-709: Implement request/response for offsetFetch batching
rajinisivaram commented on a change in pull request #10962: URL: https://github.com/apache/kafka/pull/10962#discussion_r663485547 ## File path: clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java ## @@ -154,14 +166,88 @@ public OffsetFetchResponse(int throttleTimeMs, Errors error, Map errors, Map> responseData) { Review comment: The other constructor without throttle time is for versions which didn't have throttle time. For newer constructors that will be used with newer versions, we shouldn't need that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram commented on a change in pull request #10962: KIP-709: Implement request/response for offsetFetch batching
rajinisivaram commented on a change in pull request #10962: URL: https://github.com/apache/kafka/pull/10962#discussion_r663485187 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java ## @@ -73,7 +73,10 @@ public String apiName() { public OffsetFetchRequest.Builder buildRequest(int coordinatorId, Set keys) { // Set the flag to false as for admin client request, // we don't need to wait for any pending offset state to clear. -return new OffsetFetchRequest.Builder(groupId.idValue, false, partitions, false); Review comment: We can leave the admin client changes for the next PR to keep the changes in this one small for 3.0. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram commented on a change in pull request #10962: KIP-709: Implement request/response for offsetFetch batching
rajinisivaram commented on a change in pull request #10962: URL: https://github.com/apache/kafka/pull/10962#discussion_r663350976 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java ## @@ -73,7 +73,10 @@ public String apiName() { public OffsetFetchRequest.Builder buildRequest(int coordinatorId, Set keys) { // Set the flag to false as for admin client request, // we don't need to wait for any pending offset state to clear. -return new OffsetFetchRequest.Builder(groupId.idValue, false, partitions, false); Review comment: Given that single groupid is a common pattern, we could just retain the old constructor as well. ## File path: clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java ## @@ -174,6 +319,10 @@ public boolean isAllPartitions() { return data.topics() == ALL_TOPIC_PARTITIONS; } +public List isAllPartitionsForGroup() { Review comment: Name of method suggests we are returning a boolean, but we are returning null. Why do we need a public method that always returns null? ## File path: clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java ## @@ -65,6 +69,8 @@ private final OffsetFetchResponseData data; private final Errors error; +private final Map groupLevelErrors = new HashMap<>(); +private final Map> groupToPartitionData = new HashMap<>(); Review comment: Why are we caching these when we have them in `data`? ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java ## @@ -1308,29 +1308,41 @@ private OffsetFetchResponseHandler() { @Override public void handle(OffsetFetchResponse response, RequestFuture> future) { -if (response.hasError()) { -Errors error = response.error(); -log.debug("Offset fetch failed: {}", error.message()); +Errors responseError = response.error(); +// check if error is null, if it is we are dealing with v8 response +if (responseError == null) { Review comment: We can move this to the response object, otherwise it is duplicated from above. ## File path: clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java ## @@ -68,36 +74,117 @@ public Builder(String groupId, } this.data = new OffsetFetchRequestData() -.setGroupId(groupId) -.setRequireStable(requireStable) -.setTopics(topics); +.setGroupId(groupId) +.setRequireStable(requireStable) +.setTopics(topics); Review comment: nit: unnecessary indentation change ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java ## @@ -1308,29 +1308,41 @@ private OffsetFetchResponseHandler() { @Override public void handle(OffsetFetchResponse response, RequestFuture> future) { -if (response.hasError()) { -Errors error = response.error(); -log.debug("Offset fetch failed: {}", error.message()); +Errors responseError = response.error(); +// check if error is null, if it is we are dealing with v8 response +if (responseError == null) { +if (response.groupHasError(rebalanceConfig.groupId)) { +responseError = response.groupLevelError(rebalanceConfig.groupId); +} else { +responseError = Errors.NONE; +} +} +if (responseError != Errors.NONE) { +log.debug("Offset fetch failed: {}", responseError.message()); -if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS) { +if (responseError == Errors.COORDINATOR_LOAD_IN_PROGRESS) { // just retry -future.raise(error); -} else if (error == Errors.NOT_COORDINATOR) { +future.raise(responseError); +} else if (responseError == Errors.NOT_COORDINATOR) { // re-discover the coordinator and retry -markCoordinatorUnknown(error); -future.raise(error); -} else if (error == Errors.GROUP_AUTHORIZATION_FAILED) { +markCoordinatorUnknown(responseError); +future.raise(responseError); +} else if (responseError == Errors.GROUP_AUTHORIZATION_FAILED) { future.raise(GroupAuthorizationException.forGroupId(rebalanceConfig.groupId)); } else { -future.raise(new KafkaException("Unexpected error in