rajinisivaram commented on a change in pull request #10962: URL: https://github.com/apache/kafka/pull/10962#discussion_r663488495
########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java ########## @@ -1308,29 +1308,31 @@ private OffsetFetchResponseHandler() { @Override public void handle(OffsetFetchResponse response, RequestFuture<Map<TopicPartition, OffsetAndMetadata>> future) { - if (response.hasError()) { - Errors error = response.error(); - log.debug("Offset fetch failed: {}", error.message()); + Errors responseError = response.groupLevelError(rebalanceConfig.groupId); Review comment: We could just call this `error` and then won't require the remaining changes below. ########## File path: clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java ########## @@ -78,26 +85,107 @@ boolean isAllTopicPartitions() { return this.data.topics() == ALL_TOPIC_PARTITIONS; } + public Builder(Map<String, List<TopicPartition>> groupIdToTopicPartitionMap, + boolean requireStable, Review comment: nit: indentation ########## File path: clients/src/test/java/org/apache/kafka/common/message/MessageTest.java ########## @@ -662,20 +669,179 @@ public void testOffsetFetchVersions() throws Exception { .setErrorCode(Errors.NOT_COORDINATOR.code()) .setThrottleTimeMs(10); for (short version : ApiKeys.OFFSET_FETCH.allVersions()) { - OffsetFetchResponseData responseData = response.get(); - if (version <= 1) { - responseData.setErrorCode(Errors.NONE.code()); + if (version < 8) { Review comment: As before, we should have `if version >= 8` as well. ########## File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java ########## @@ -87,12 +87,16 @@ public String apiName() { Map<CoordinatorKey, Throwable> failed = new HashMap<>(); List<CoordinatorKey> unmapped = new ArrayList<>(); - if (response.error() != Errors.NONE) { - handleError(groupId, response.error(), failed, unmapped); + Errors responseError = response.groupLevelError(groupId.idValue); + if (responseError != Errors.NONE) { + handleError(groupId, responseError, failed, unmapped); } else { final Map<TopicPartition, OffsetAndMetadata> groupOffsetsListing = new HashMap<>(); - for (Map.Entry<TopicPartition, OffsetFetchResponse.PartitionData> entry : - response.responseData().entrySet()) { + // if entry for group level response data is null, we are getting back an older version + // of the response Review comment: Is this comment required? ########## File path: core/src/main/scala/kafka/server/KafkaApis.scala ########## @@ -1255,81 +1256,143 @@ class KafkaApis(val requestChannel: RequestChannel, * Handle an offset fetch request */ def handleOffsetFetchRequest(request: RequestChannel.Request): Unit = { + val version = request.header.apiVersion + if (version == 0) { + // reading offsets from ZK + handleOffsetFetchRequestV0(request) + } else if (version >= 1 && version <= 7) { + // reading offsets from Kafka + handleOffsetFetchRequestBetweenV1AndV7(request) + } else { + // batching offset reads for multiple groups starts with version 8 and greater + handleOffsetFetchRequestV8AndAbove(request) + } + } + + private def handleOffsetFetchRequestV0(request: RequestChannel.Request): Unit = { val header = request.header val offsetFetchRequest = request.body[OffsetFetchRequest] - def partitionByAuthorized(seq: Seq[TopicPartition]): (Seq[TopicPartition], Seq[TopicPartition]) = - authHelper.partitionSeqByAuthorized(request.context, DESCRIBE, TOPIC, seq)(_.topic) - def createResponse(requestThrottleMs: Int): AbstractResponse = { val offsetFetchResponse = - // reject the request if not authorized to the group + // reject the request if not authorized to the group Review comment: nit: revert indentation change ########## File path: core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala ########## @@ -1358,17 +1367,241 @@ class AuthorizerIntegrationTest extends BaseRequestTest { // note there's only one broker, so no need to lookup the group coordinator // without describe permission on the topic, we shouldn't be able to fetch offsets - val offsetFetchRequest = new requests.OffsetFetchRequest.Builder(group, false, null, false).build() + val offsetFetchRequest = createOffsetFetchRequestAllPartitions var offsetFetchResponse = connectAndReceive[OffsetFetchResponse](offsetFetchRequest) - assertEquals(Errors.NONE, offsetFetchResponse.error) - assertTrue(offsetFetchResponse.responseData.isEmpty) + assertEquals(Errors.NONE, offsetFetchResponse.groupLevelError(group)) + assertTrue(offsetFetchResponse.responseData(group).isEmpty) // now add describe permission on the topic and verify that the offset can be fetched addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, DESCRIBE, ALLOW)), topicResource) offsetFetchResponse = connectAndReceive[OffsetFetchResponse](offsetFetchRequest) - assertEquals(Errors.NONE, offsetFetchResponse.error) - assertTrue(offsetFetchResponse.responseData.containsKey(tp)) - assertEquals(offset, offsetFetchResponse.responseData.get(tp).offset) + assertEquals(Errors.NONE, offsetFetchResponse.groupLevelError(group)) + assertTrue(offsetFetchResponse.responseData(group).containsKey(tp)) + assertEquals(offset, offsetFetchResponse.responseData(group).get(tp).offset) + } + + @Test + def testOffsetFetchMultipleGroupsAuthorization(): Unit = { + val groupOne = "group1" + val groupOneResource = new ResourcePattern(GROUP, groupOne, LITERAL) + val groupTwo = "group2" + val groupTwoResource = new ResourcePattern(GROUP, groupTwo, LITERAL) + val groupThree = "group3" + val groupThreeResource = new ResourcePattern(GROUP, groupThree, LITERAL) + val groupFour = "group4" + val groupFourResource = new ResourcePattern(GROUP, groupFour, LITERAL) + val groupFive = "group5" + val groupFiveResource = new ResourcePattern(GROUP, groupFive, LITERAL) + + val topic1 = "topic1" + val topic1List = singletonList(new TopicPartition(topic1, 0)) + val topicOneResource = new ResourcePattern(TOPIC, topic1, LITERAL) + val topic2 = "topic2" + val topic1And2List = util.Arrays.asList( + new TopicPartition(topic1, 0), + new TopicPartition(topic2, 0), + new TopicPartition(topic2, 1)) + val topicTwoResource = new ResourcePattern(TOPIC, topic2, LITERAL) + val topic3 = "topic3" + val allTopicsList = util.Arrays.asList( + new TopicPartition(topic1, 0), + new TopicPartition(topic2, 0), + new TopicPartition(topic2, 1), + new TopicPartition(topic3, 0), + new TopicPartition(topic3, 1), + new TopicPartition(topic3, 2)) + val topicThreeResource = new ResourcePattern(TOPIC, topic3, LITERAL) + + // create group to partition map to build batched offsetFetch request + val groupToPartitionMap = new util.HashMap[String, util.List[TopicPartition]]() + groupToPartitionMap.put(groupOne, topic1List) + groupToPartitionMap.put(groupTwo, topic1And2List) + groupToPartitionMap.put(groupThree, allTopicsList) + groupToPartitionMap.put(groupFour, null) + groupToPartitionMap.put(groupFive, null) + + createTopic(topic1) + createTopic(topic2, numPartitions = 2) + createTopic(topic3, numPartitions = 3) + + addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, READ, ALLOW)), groupOneResource) + addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, READ, ALLOW)), groupTwoResource) + addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, READ, ALLOW)), groupThreeResource) + addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, READ, ALLOW)), groupFourResource) + addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, READ, ALLOW)), groupFiveResource) + addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, READ, ALLOW)), topicOneResource) + addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, READ, ALLOW)), topicTwoResource) + addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, READ, ALLOW)), topicThreeResource) + + val offset = 15L + val leaderEpoch: Optional[Integer] = Optional.of(1) + val metadata = "metadata" + val topicOneOffsets = topic1List.asScala.map{ Review comment: nit: space before braces (multiple places) ########## File path: clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java ########## @@ -78,26 +85,107 @@ boolean isAllTopicPartitions() { return this.data.topics() == ALL_TOPIC_PARTITIONS; } + public Builder(Map<String, List<TopicPartition>> groupIdToTopicPartitionMap, + boolean requireStable, + boolean throwOnFetchStableOffsetsUnsupported) { + super(ApiKeys.OFFSET_FETCH); + + List<OffsetFetchRequestGroup> groups = new ArrayList<>(); + for (Entry<String, List<TopicPartition>> entry : groupIdToTopicPartitionMap.entrySet()) { + String groupName = entry.getKey(); + List<TopicPartition> tpList = entry.getValue(); + final List<OffsetFetchRequestTopics> topics; + if (tpList != null) { + Map<String, OffsetFetchRequestTopics> offsetFetchRequestTopicMap = + new HashMap<>(); + for (TopicPartition topicPartition : tpList) { + String topicName = topicPartition.topic(); + OffsetFetchRequestTopics topic = offsetFetchRequestTopicMap.getOrDefault( + topicName, new OffsetFetchRequestTopics().setName(topicName)); + topic.partitionIndexes().add(topicPartition.partition()); + offsetFetchRequestTopicMap.put(topicName, topic); + } + topics = new ArrayList<>(offsetFetchRequestTopicMap.values()); + } else { + topics = ALL_TOPIC_PARTITIONS_BATCH; + } + groups.add(new OffsetFetchRequestGroup() + .setGroupId(groupName) + .setTopics(topics)); + } + this.data = new OffsetFetchRequestData() + .setGroupIds(groups) + .setRequireStable(requireStable); + this.throwOnFetchStableOffsetsUnsupported = throwOnFetchStableOffsetsUnsupported; + } + @Override public OffsetFetchRequest build(short version) { if (isAllTopicPartitions() && version < 2) { throw new UnsupportedVersionException("The broker only supports OffsetFetchRequest " + "v" + version + ", but we need v2 or newer to request all topic partitions."); } - + if (data.groupIds().size() > 1 && version < 8) { + throw new NoBatchedOffsetFetchRequestException("Broker does not support" + + " batching groups for fetch offset request on version " + version); + } if (data.requireStable() && version < 7) { if (throwOnFetchStableOffsetsUnsupported) { throw new UnsupportedVersionException("Broker unexpectedly " + "doesn't support requireStable flag on version " + version); } else { log.trace("Fallback the requireStable flag to false as broker " + - "only supports OffsetFetchRequest version {}. Need " + - "v7 or newer to enable this feature", version); + "only supports OffsetFetchRequest version {}. Need " + + "v7 or newer to enable this feature", version); return new OffsetFetchRequest(data.setRequireStable(false), version); } } - + if (version < 8) { + OffsetFetchRequestData oldDataFormat = null; + if (!data.groupIds().isEmpty()) { + OffsetFetchRequestGroup group = data.groupIds().get(0); + String groupName = group.groupId(); + List<OffsetFetchRequestTopics> topics = group.topics(); + List<OffsetFetchRequestTopic> oldFormatTopics = null; + if (topics != null) { + oldFormatTopics = topics + .stream() + .map(t -> + new OffsetFetchRequestTopic() + .setName(t.name()) + .setPartitionIndexes(t.partitionIndexes())) + .collect(Collectors.toList()); + } + oldDataFormat = new OffsetFetchRequestData() + .setGroupId(groupName) + .setTopics(oldFormatTopics) + .setRequireStable(data.requireStable()); + } + return new OffsetFetchRequest(oldDataFormat == null ? data : oldDataFormat, version); + } + // version 8 but have used old format of request, convert to version 8 of request Review comment: May be better to put the following code in an else statement and put the comment at the start of the `if` block since we may be converting for version < 8 as well in the block above. ########## File path: clients/src/test/java/org/apache/kafka/common/requests/OffsetFetchRequestTest.java ########## @@ -76,62 +73,169 @@ public void testConstructor() { } for (short version : ApiKeys.OFFSET_FETCH.allVersions()) { - OffsetFetchRequest request = builder.build(version); - assertFalse(request.isAllPartitions()); - assertEquals(groupId, request.groupId()); - assertEquals(partitions, request.partitions()); - - OffsetFetchResponse response = request.getErrorResponse(throttleTimeMs, Errors.NONE); - assertEquals(Errors.NONE, response.error()); - assertFalse(response.hasError()); - assertEquals(Collections.singletonMap(Errors.NONE, version <= (short) 1 ? 3 : 1), response.errorCounts(), - "Incorrect error count for version " + version); - - if (version <= 1) { - assertEquals(expectedData, response.responseData()); + if (version < 8) { + builder = new OffsetFetchRequest.Builder( + group1, + false, + partitions, + false); + assertFalse(builder.isAllTopicPartitions()); + OffsetFetchRequest request = builder.build(version); + assertFalse(request.isAllPartitions()); + assertEquals(group1, request.groupId()); + assertEquals(partitions, request.partitions()); + + OffsetFetchResponse response = request.getErrorResponse(throttleTimeMs, Errors.NONE); + assertEquals(Errors.NONE, response.error()); + assertFalse(response.hasError()); + assertEquals(Collections.singletonMap(Errors.NONE, version <= (short) 1 ? 3 : 1), response.errorCounts(), + "Incorrect error count for version " + version); + + if (version <= 1) { + assertEquals(expectedData, response.responseDataV0ToV7()); + } + + if (version >= 3) { + assertEquals(throttleTimeMs, response.throttleTimeMs()); + } else { + assertEquals(DEFAULT_THROTTLE_TIME, response.throttleTimeMs()); + } + } else { + builder = new Builder(Collections.singletonMap(group1, partitions), false, false); + OffsetFetchRequest request = builder.build(version); + Map<String, List<TopicPartition>> groupToPartitionMap = + request.groupIdsToPartitions(); + Map<String, List<OffsetFetchRequestTopics>> groupToTopicMap = + request.groupIdsToTopics(); + assertFalse(request.isAllPartitionsForGroup(group1)); + assertTrue(groupToPartitionMap.containsKey(group1) && groupToTopicMap.containsKey( + group1)); + assertEquals(partitions, groupToPartitionMap.get(group1)); + OffsetFetchResponse response = request.getErrorResponse(throttleTimeMs, Errors.NONE); + assertEquals(Errors.NONE, response.groupLevelError(group1)); + assertFalse(response.groupHasError(group1)); + assertEquals(Collections.singletonMap(Errors.NONE, 1), response.errorCounts(), + "Incorrect error count for version " + version); + assertEquals(throttleTimeMs, response.throttleTimeMs()); } + } + } + + @Test + public void testConstructorWithMultipleGroups() { + List<TopicPartition> topic1Partitions = Arrays.asList( + new TopicPartition(topicOne, partitionOne), + new TopicPartition(topicOne, partitionTwo)); + List<TopicPartition> topic2Partitions = Arrays.asList( + new TopicPartition(topicTwo, partitionOne), + new TopicPartition(topicTwo, partitionTwo)); + List<TopicPartition> topic3Partitions = Arrays.asList( + new TopicPartition(topicThree, partitionOne), + new TopicPartition(topicThree, partitionTwo)); + Map<String, List<TopicPartition>> groupToTp = new HashMap<>(); + groupToTp.put(group1, topic1Partitions); + groupToTp.put(group2, topic2Partitions); + groupToTp.put(group3, topic3Partitions); + groupToTp.put(group4, null); + groupToTp.put(group5, null); + int throttleTimeMs = 10; - if (version >= 3) { + for (short version : ApiKeys.OFFSET_FETCH.allVersions()) { + if (version >= 8) { + builder = new Builder(groupToTp, false, false); + OffsetFetchRequest request = builder.build(version); + Map<String, List<TopicPartition>> groupToPartitionMap = + request.groupIdsToPartitions(); + Map<String, List<OffsetFetchRequestTopics>> groupToTopicMap = + request.groupIdsToTopics(); + assertEquals(groupToTp.keySet(), groupToTopicMap.keySet()); + assertEquals(groupToTp.keySet(), groupToPartitionMap.keySet()); + assertFalse(request.isAllPartitionsForGroup(group1)); + assertFalse(request.isAllPartitionsForGroup(group2)); + assertFalse(request.isAllPartitionsForGroup(group3)); + assertTrue(request.isAllPartitionsForGroup(group4)); + assertTrue(request.isAllPartitionsForGroup(group5)); + OffsetFetchResponse response = request.getErrorResponse(throttleTimeMs, Errors.NONE); + assertEquals(Errors.NONE, response.groupLevelError(group1)); + assertEquals(Errors.NONE, response.groupLevelError(group2)); + assertEquals(Errors.NONE, response.groupLevelError(group3)); + assertEquals(Errors.NONE, response.groupLevelError(group4)); + assertEquals(Errors.NONE, response.groupLevelError(group5)); Review comment: List of groups will enable us to use loop here. ########## File path: clients/src/test/java/org/apache/kafka/common/message/MessageTest.java ########## @@ -633,17 +638,19 @@ public void testOffsetFetchVersions() throws Exception { .setRequireStable(true); for (short version : ApiKeys.OFFSET_FETCH.allVersions()) { - final short finalVersion = version; - if (version < 2) { - assertThrows(NullPointerException.class, () -> testAllMessageRoundTripsFromVersion(finalVersion, allPartitionData)); - } else { - testAllMessageRoundTripsFromVersion(version, allPartitionData); - } - - if (version < 7) { - assertThrows(UnsupportedVersionException.class, () -> testAllMessageRoundTripsFromVersion(finalVersion, requireStableData)); - } else { - testAllMessageRoundTripsFromVersion(finalVersion, requireStableData); + if (version < 8) { Review comment: We should probably just add this test for v8 and above as well. The other v8 test can just focus on multiple groups, this one can test the common case of single group that is used by consumers. ########## File path: clients/src/test/java/org/apache/kafka/common/requests/OffsetFetchRequestTest.java ########## @@ -76,62 +73,169 @@ public void testConstructor() { } for (short version : ApiKeys.OFFSET_FETCH.allVersions()) { - OffsetFetchRequest request = builder.build(version); - assertFalse(request.isAllPartitions()); - assertEquals(groupId, request.groupId()); - assertEquals(partitions, request.partitions()); - - OffsetFetchResponse response = request.getErrorResponse(throttleTimeMs, Errors.NONE); - assertEquals(Errors.NONE, response.error()); - assertFalse(response.hasError()); - assertEquals(Collections.singletonMap(Errors.NONE, version <= (short) 1 ? 3 : 1), response.errorCounts(), - "Incorrect error count for version " + version); - - if (version <= 1) { - assertEquals(expectedData, response.responseData()); + if (version < 8) { + builder = new OffsetFetchRequest.Builder( + group1, + false, + partitions, + false); + assertFalse(builder.isAllTopicPartitions()); + OffsetFetchRequest request = builder.build(version); + assertFalse(request.isAllPartitions()); + assertEquals(group1, request.groupId()); + assertEquals(partitions, request.partitions()); + + OffsetFetchResponse response = request.getErrorResponse(throttleTimeMs, Errors.NONE); + assertEquals(Errors.NONE, response.error()); + assertFalse(response.hasError()); + assertEquals(Collections.singletonMap(Errors.NONE, version <= (short) 1 ? 3 : 1), response.errorCounts(), + "Incorrect error count for version " + version); + + if (version <= 1) { + assertEquals(expectedData, response.responseDataV0ToV7()); + } + + if (version >= 3) { + assertEquals(throttleTimeMs, response.throttleTimeMs()); + } else { + assertEquals(DEFAULT_THROTTLE_TIME, response.throttleTimeMs()); + } + } else { + builder = new Builder(Collections.singletonMap(group1, partitions), false, false); + OffsetFetchRequest request = builder.build(version); + Map<String, List<TopicPartition>> groupToPartitionMap = + request.groupIdsToPartitions(); + Map<String, List<OffsetFetchRequestTopics>> groupToTopicMap = + request.groupIdsToTopics(); + assertFalse(request.isAllPartitionsForGroup(group1)); + assertTrue(groupToPartitionMap.containsKey(group1) && groupToTopicMap.containsKey( + group1)); + assertEquals(partitions, groupToPartitionMap.get(group1)); + OffsetFetchResponse response = request.getErrorResponse(throttleTimeMs, Errors.NONE); + assertEquals(Errors.NONE, response.groupLevelError(group1)); + assertFalse(response.groupHasError(group1)); + assertEquals(Collections.singletonMap(Errors.NONE, 1), response.errorCounts(), + "Incorrect error count for version " + version); + assertEquals(throttleTimeMs, response.throttleTimeMs()); } + } + } + + @Test + public void testConstructorWithMultipleGroups() { + List<TopicPartition> topic1Partitions = Arrays.asList( + new TopicPartition(topicOne, partitionOne), + new TopicPartition(topicOne, partitionTwo)); + List<TopicPartition> topic2Partitions = Arrays.asList( + new TopicPartition(topicTwo, partitionOne), + new TopicPartition(topicTwo, partitionTwo)); + List<TopicPartition> topic3Partitions = Arrays.asList( + new TopicPartition(topicThree, partitionOne), + new TopicPartition(topicThree, partitionTwo)); + Map<String, List<TopicPartition>> groupToTp = new HashMap<>(); + groupToTp.put(group1, topic1Partitions); + groupToTp.put(group2, topic2Partitions); + groupToTp.put(group3, topic3Partitions); + groupToTp.put(group4, null); + groupToTp.put(group5, null); + int throttleTimeMs = 10; - if (version >= 3) { + for (short version : ApiKeys.OFFSET_FETCH.allVersions()) { + if (version >= 8) { + builder = new Builder(groupToTp, false, false); + OffsetFetchRequest request = builder.build(version); + Map<String, List<TopicPartition>> groupToPartitionMap = + request.groupIdsToPartitions(); + Map<String, List<OffsetFetchRequestTopics>> groupToTopicMap = + request.groupIdsToTopics(); + assertEquals(groupToTp.keySet(), groupToTopicMap.keySet()); + assertEquals(groupToTp.keySet(), groupToPartitionMap.keySet()); + assertFalse(request.isAllPartitionsForGroup(group1)); + assertFalse(request.isAllPartitionsForGroup(group2)); + assertFalse(request.isAllPartitionsForGroup(group3)); + assertTrue(request.isAllPartitionsForGroup(group4)); + assertTrue(request.isAllPartitionsForGroup(group5)); + OffsetFetchResponse response = request.getErrorResponse(throttleTimeMs, Errors.NONE); + assertEquals(Errors.NONE, response.groupLevelError(group1)); + assertEquals(Errors.NONE, response.groupLevelError(group2)); + assertEquals(Errors.NONE, response.groupLevelError(group3)); + assertEquals(Errors.NONE, response.groupLevelError(group4)); + assertEquals(Errors.NONE, response.groupLevelError(group5)); + assertFalse(response.groupHasError(group1)); + assertFalse(response.groupHasError(group2)); + assertFalse(response.groupHasError(group3)); + assertFalse(response.groupHasError(group4)); + assertFalse(response.groupHasError(group5)); + assertEquals(Collections.singletonMap(Errors.NONE, 5), response.errorCounts(), + "Incorrect error count for version " + version); assertEquals(throttleTimeMs, response.throttleTimeMs()); - } else { - assertEquals(DEFAULT_THROTTLE_TIME, response.throttleTimeMs()); } } } @Test - public void testConstructorFailForUnsupportedRequireStable() { + public void testBuildThrowForUnsupportedBatchRequest() { for (short version : ApiKeys.OFFSET_FETCH.allVersions()) { - // The builder needs to be initialized every cycle as the internal data `requireStable` flag is flipped. - builder = new OffsetFetchRequest.Builder(groupId, true, null, false); - final short finalVersion = version; - if (version < 2) { - assertThrows(UnsupportedVersionException.class, () -> builder.build(finalVersion)); - } else { - OffsetFetchRequest request = builder.build(finalVersion); - assertEquals(groupId, request.groupId()); - assertNull(request.partitions()); - assertTrue(request.isAllPartitions()); - if (version < 7) { - assertFalse(request.requireStable()); - } else { - assertTrue(request.requireStable()); - } + if (version < 8) { + Map<String, List<TopicPartition>> groupPartitionMap = new HashMap<>(); + groupPartitionMap.put(group1, null); + groupPartitionMap.put(group2, null); + builder = new Builder(groupPartitionMap, true, false); + final short finalVersion = version; + assertThrows(NoBatchedOffsetFetchRequestException.class, () -> builder.build(finalVersion)); } } } @Test - public void testBuildThrowForUnsupportedRequireStable() { + public void testConstructorFailForUnsupportedRequireStable() { for (short version : ApiKeys.OFFSET_FETCH.allVersions()) { - builder = new OffsetFetchRequest.Builder(groupId, true, null, true); - if (version < 7) { + if (version < 8) { + // The builder needs to be initialized every cycle as the internal data `requireStable` flag is flipped. + builder = new OffsetFetchRequest.Builder(group1, true, null, false); final short finalVersion = version; - assertThrows(UnsupportedVersionException.class, () -> builder.build(finalVersion)); + if (version < 2) { + assertThrows(UnsupportedVersionException.class, () -> builder.build(finalVersion)); + } else { + OffsetFetchRequest request = builder.build(finalVersion); + assertEquals(group1, request.groupId()); + assertNull(request.partitions()); + assertTrue(request.isAllPartitions()); + if (version < 7) { + assertFalse(request.requireStable()); + } else { + assertTrue(request.requireStable()); + } + } } else { + builder = new Builder(Collections.singletonMap(group1, null), true, false); OffsetFetchRequest request = builder.build(version); + Map<String, List<TopicPartition>> groupToPartitionMap = + request.groupIdsToPartitions(); + Map<String, List<OffsetFetchRequestTopics>> groupToTopicMap = + request.groupIdsToTopics(); + assertTrue(groupToPartitionMap.containsKey(group1) && groupToTopicMap.containsKey( + group1)); + assertNull(groupToPartitionMap.get(group1)); + assertTrue(request.isAllPartitionsForGroup(group1)); assertTrue(request.requireStable()); } } } + + @Test + public void testBuildThrowForUnsupportedRequireStable() { + for (short version : ApiKeys.OFFSET_FETCH.allVersions()) { + if (version < 8) { Review comment: As before, it will be good to include the same logic for `version >=8` as well. Based on the number of tests that use the pattern of `ApiKeys.OFFSET_FETCH.allVersions()` followed by `if (version < 8)`, it may be good to add the list of versions < 8 and use that in these cases if we require this pattern. ########## File path: clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java ########## @@ -154,14 +167,74 @@ public OffsetFetchResponse(int throttleTimeMs, Errors error, Map<TopicPartition, this.error = error; } + /** + * Constructor without throttle time for version 8 and above. + * @param errors Error code on a per group level basis + * @param responseData Fetched offset information grouped group id + */ + public OffsetFetchResponse(Map<String, Errors> errors, Map<String, Map<TopicPartition, PartitionData>> responseData) { + this(DEFAULT_THROTTLE_TIME, errors, responseData); + } + + /** + * Constructor with throttle time for version 8 and above. + * @param throttleTimeMs The time in milliseconds that this response was throttled + * @param errors Potential coordinator or group level error code + * @param responseData Fetched offset information grouped by topic-partition and by group + */ + public OffsetFetchResponse(int throttleTimeMs, Map<String, Errors> errors, Map<String, + Map<TopicPartition, PartitionData>> responseData) { Review comment: nit: indentation ########## File path: clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java ########## @@ -174,6 +315,28 @@ public boolean isAllPartitions() { return data.topics() == ALL_TOPIC_PARTITIONS; } + public boolean isAllPartitionsForGroup(String groupId) { + OffsetFetchRequestGroup group = data + .groupIds() + .stream() + .filter(g -> g.groupId().equals(groupId)) + .collect(toSingleton()); Review comment: This is going to throw IllegalStateException if a group is included multiple times in a request. If that is an invalid request, we should throw InvalidRequestException before it gets here to ensure we don't return UNKNOWN_SERVER_ERROR. More likely, it is valid and we should not throw an exception. ########## File path: clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java ########## @@ -113,8 +120,14 @@ public int hashCode() { } } + public OffsetFetchResponse(OffsetFetchResponseData data) { + super(ApiKeys.OFFSET_FETCH); + this.data = data; + this.error = null; + } + /** - * Constructor for all versions without throttle time. + * Constructor without throttle time for version 0 to version 7. Review comment: It is not actually for version 0 to 7, it is for the versions without throttle time. ########## File path: clients/src/test/java/org/apache/kafka/common/requests/OffsetFetchRequestTest.java ########## @@ -44,25 +46,20 @@ private final int partitionOne = 1; private final String topicTwo = "topic2"; private final int partitionTwo = 2; - private final String groupId = "groupId"; + private final String topicThree = "topic3"; + private final String group1 = "group1"; + private final String group2 = "group2"; + private final String group3 = "group3"; + private final String group4 = "group4"; + private final String group5 = "group5"; Review comment: Maybe better to define `List<String> groups` to avoid repetition here and in the test logic ########## File path: clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java ########## @@ -185,21 +249,46 @@ public boolean hasError() { return error != Errors.NONE; } + public boolean groupHasError(String groupId) { + return groupLevelErrors.get(groupId) != Errors.NONE; + } + public Errors error() { return error; } + public Errors groupLevelError(String groupId) { + if (error != null) { + return error; + } + return groupLevelErrors.get(groupId); + } + @Override public Map<Errors, Integer> errorCounts() { Map<Errors, Integer> counts = new HashMap<>(); - updateErrorCounts(counts, error); - data.topics().forEach(topic -> - topic.partitions().forEach(partition -> + if (!groupLevelErrors.isEmpty()) { + // built response with v8 or above + for (Map.Entry<String, Errors> entry : groupLevelErrors.entrySet()) { + updateErrorCounts(counts, entry.getValue()); + } + for (OffsetFetchResponseGroup group : data.groupIds()) { + group.topics().forEach(topic -> + topic.partitions().forEach(partition -> updateErrorCounts(counts, Errors.forCode(partition.errorCode())))); + } + } else { + // built response with v0-v7 + updateErrorCounts(counts, error); + data.topics().forEach(topic -> + topic.partitions().forEach(partition -> + updateErrorCounts(counts, Errors.forCode(partition.errorCode())))); + } return counts; } - public Map<TopicPartition, PartitionData> responseData() { + //public for testing purposes + public Map<TopicPartition, PartitionData> responseDataV0ToV7() { Review comment: package-private should be sufficient for unit tests? May also be possible keep this private and call `responseData()` directly to ensure the right data is returned based on version. ########## File path: clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java ########## @@ -214,6 +303,46 @@ public Errors error() { return responseData; } + private Map<TopicPartition, PartitionData> buildResponseData(String groupId) { + Map<TopicPartition, PartitionData> responseData = new HashMap<>(); + OffsetFetchResponseGroup group = data + .groupIds() + .stream() + .filter(g -> g.groupId().equals(groupId)) + .collect(toSingleton()); + for (OffsetFetchResponseTopics topic : group.topics()) { + for (OffsetFetchResponsePartitions partition : topic.partitions()) { + responseData.put(new TopicPartition(topic.name(), partition.partitionIndex()), + new PartitionData(partition.committedOffset(), + RequestUtils.getLeaderEpoch(partition.committedLeaderEpoch()), + partition.metadata(), + Errors.forCode(partition.errorCode())) + ); + } + } + return responseData; + } + + // Custom collector to filter a single element + private <T> Collector<T, ?, T> toSingleton() { Review comment: See earlier note about this method. ########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java ########## @@ -1308,29 +1308,31 @@ private OffsetFetchResponseHandler() { @Override public void handle(OffsetFetchResponse response, RequestFuture<Map<TopicPartition, OffsetAndMetadata>> future) { - if (response.hasError()) { - Errors error = response.error(); - log.debug("Offset fetch failed: {}", error.message()); + Errors responseError = response.groupLevelError(rebalanceConfig.groupId); + if (responseError != Errors.NONE) { + log.debug("Offset fetch failed: {}", responseError.message()); - if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS) { + if (responseError == Errors.COORDINATOR_LOAD_IN_PROGRESS) { // just retry - future.raise(error); - } else if (error == Errors.NOT_COORDINATOR) { + future.raise(responseError); + } else if (responseError == Errors.NOT_COORDINATOR) { // re-discover the coordinator and retry - markCoordinatorUnknown(error); - future.raise(error); - } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) { + markCoordinatorUnknown(responseError); + future.raise(responseError); + } else if (responseError == Errors.GROUP_AUTHORIZATION_FAILED) { future.raise(GroupAuthorizationException.forGroupId(rebalanceConfig.groupId)); } else { - future.raise(new KafkaException("Unexpected error in fetch offset response: " + error.message())); + future.raise(new KafkaException("Unexpected error in fetch offset response: " + responseError.message())); } return; } Set<String> unauthorizedTopics = null; - Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(response.responseData().size()); + Map<TopicPartition, OffsetFetchResponse.PartitionData> responseData = Review comment: as before, rename responseData? ########## File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java ########## @@ -87,12 +87,16 @@ public String apiName() { Map<CoordinatorKey, Throwable> failed = new HashMap<>(); List<CoordinatorKey> unmapped = new ArrayList<>(); - if (response.error() != Errors.NONE) { - handleError(groupId, response.error(), failed, unmapped); + Errors responseError = response.groupLevelError(groupId.idValue); + if (responseError != Errors.NONE) { + handleError(groupId, responseError, failed, unmapped); } else { final Map<TopicPartition, OffsetAndMetadata> groupOffsetsListing = new HashMap<>(); - for (Map.Entry<TopicPartition, OffsetFetchResponse.PartitionData> entry : - response.responseData().entrySet()) { + // if entry for group level response data is null, we are getting back an older version + // of the response + Map<TopicPartition, OffsetFetchResponse.PartitionData> responseData = + response.responseData(groupId.idValue); Review comment: Perhaps call the method and variable partitionData or offsetData rather than responseData since responseData is too close to response.data? ########## File path: clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java ########## @@ -174,6 +315,28 @@ public boolean isAllPartitions() { return data.topics() == ALL_TOPIC_PARTITIONS; } + public boolean isAllPartitionsForGroup(String groupId) { + OffsetFetchRequestGroup group = data + .groupIds() + .stream() + .filter(g -> g.groupId().equals(groupId)) + .collect(toSingleton()); + return group.topics() == ALL_TOPIC_PARTITIONS_BATCH; + } + + // Custom collector to filter a single element + private <T> Collector<T, ?, T> toSingleton() { Review comment: We seem to have two copies of this method in the PR. We need to decide first whether this logic is correct first. If we really do want the method, we need to make it shared. ########## File path: core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala ########## @@ -1358,17 +1367,241 @@ class AuthorizerIntegrationTest extends BaseRequestTest { // note there's only one broker, so no need to lookup the group coordinator // without describe permission on the topic, we shouldn't be able to fetch offsets - val offsetFetchRequest = new requests.OffsetFetchRequest.Builder(group, false, null, false).build() + val offsetFetchRequest = createOffsetFetchRequestAllPartitions var offsetFetchResponse = connectAndReceive[OffsetFetchResponse](offsetFetchRequest) - assertEquals(Errors.NONE, offsetFetchResponse.error) - assertTrue(offsetFetchResponse.responseData.isEmpty) + assertEquals(Errors.NONE, offsetFetchResponse.groupLevelError(group)) + assertTrue(offsetFetchResponse.responseData(group).isEmpty) // now add describe permission on the topic and verify that the offset can be fetched addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, DESCRIBE, ALLOW)), topicResource) offsetFetchResponse = connectAndReceive[OffsetFetchResponse](offsetFetchRequest) - assertEquals(Errors.NONE, offsetFetchResponse.error) - assertTrue(offsetFetchResponse.responseData.containsKey(tp)) - assertEquals(offset, offsetFetchResponse.responseData.get(tp).offset) + assertEquals(Errors.NONE, offsetFetchResponse.groupLevelError(group)) + assertTrue(offsetFetchResponse.responseData(group).containsKey(tp)) + assertEquals(offset, offsetFetchResponse.responseData(group).get(tp).offset) + } + + @Test + def testOffsetFetchMultipleGroupsAuthorization(): Unit = { + val groupOne = "group1" + val groupOneResource = new ResourcePattern(GROUP, groupOne, LITERAL) + val groupTwo = "group2" + val groupTwoResource = new ResourcePattern(GROUP, groupTwo, LITERAL) + val groupThree = "group3" + val groupThreeResource = new ResourcePattern(GROUP, groupThree, LITERAL) + val groupFour = "group4" + val groupFourResource = new ResourcePattern(GROUP, groupFour, LITERAL) + val groupFive = "group5" + val groupFiveResource = new ResourcePattern(GROUP, groupFive, LITERAL) + + val topic1 = "topic1" + val topic1List = singletonList(new TopicPartition(topic1, 0)) + val topicOneResource = new ResourcePattern(TOPIC, topic1, LITERAL) + val topic2 = "topic2" + val topic1And2List = util.Arrays.asList( + new TopicPartition(topic1, 0), + new TopicPartition(topic2, 0), + new TopicPartition(topic2, 1)) + val topicTwoResource = new ResourcePattern(TOPIC, topic2, LITERAL) + val topic3 = "topic3" + val allTopicsList = util.Arrays.asList( + new TopicPartition(topic1, 0), + new TopicPartition(topic2, 0), + new TopicPartition(topic2, 1), + new TopicPartition(topic3, 0), + new TopicPartition(topic3, 1), + new TopicPartition(topic3, 2)) + val topicThreeResource = new ResourcePattern(TOPIC, topic3, LITERAL) + + // create group to partition map to build batched offsetFetch request + val groupToPartitionMap = new util.HashMap[String, util.List[TopicPartition]]() + groupToPartitionMap.put(groupOne, topic1List) + groupToPartitionMap.put(groupTwo, topic1And2List) + groupToPartitionMap.put(groupThree, allTopicsList) + groupToPartitionMap.put(groupFour, null) + groupToPartitionMap.put(groupFive, null) + + createTopic(topic1) + createTopic(topic2, numPartitions = 2) + createTopic(topic3, numPartitions = 3) + + addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, READ, ALLOW)), groupOneResource) + addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, READ, ALLOW)), groupTwoResource) + addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, READ, ALLOW)), groupThreeResource) + addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, READ, ALLOW)), groupFourResource) + addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, READ, ALLOW)), groupFiveResource) + addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, READ, ALLOW)), topicOneResource) + addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, READ, ALLOW)), topicTwoResource) + addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, READ, ALLOW)), topicThreeResource) Review comment: Looks like they are are all the same with different resources, better to put them in a collection and call `addAndVerifyAcls` over the collection ########## File path: clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java ########## @@ -1580,21 +1623,83 @@ private OffsetCommitResponse createOffsetCommitResponse() { } private OffsetFetchRequest createOffsetFetchRequest(int version, boolean requireStable) { - return new OffsetFetchRequest.Builder("group1", requireStable, Collections.singletonList(new TopicPartition("test11", 1)), false) + if (version < 8) { + return new OffsetFetchRequest.Builder( + "group1", + requireStable, + Collections.singletonList( + new TopicPartition("test11", 1)), + false) .build((short) version); + } + return new OffsetFetchRequest.Builder( + Collections.singletonMap( + "group1", + Collections.singletonList( + new TopicPartition("test11", 1))), Review comment: nit: unnecessary newlines ########## File path: clients/src/main/resources/common/message/OffsetFetchResponse.json ########## @@ -30,30 +30,57 @@ // Version 6 is the first flexible version. // // Version 7 adds pending offset commit as new error response on partition level. - "validVersions": "0-7", + // + // Version 8 is adding support for fetching offsets for multiple groups + "validVersions": "0-8", "flexibleVersions": "6+", "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "3+", "ignorable": true, "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, - { "name": "Topics", "type": "[]OffsetFetchResponseTopic", "versions": "0+", + { "name": "Topics", "type": "[]OffsetFetchResponseTopic", "versions": "0-7", "about": "The responses per topic.", "fields": [ - { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName", + { "name": "Name", "type": "string", "versions": "0-7", "entityType": "topicName", "about": "The topic name." }, - { "name": "Partitions", "type": "[]OffsetFetchResponsePartition", "versions": "0+", + { "name": "Partitions", "type": "[]OffsetFetchResponsePartition", "versions": "0-7", "about": "The responses per partition", "fields": [ - { "name": "PartitionIndex", "type": "int32", "versions": "0+", + { "name": "PartitionIndex", "type": "int32", "versions": "0-7", "about": "The partition index." }, - { "name": "CommittedOffset", "type": "int64", "versions": "0+", + { "name": "CommittedOffset", "type": "int64", "versions": "0-7", "about": "The committed message offset." }, - { "name": "CommittedLeaderEpoch", "type": "int32", "versions": "5+", "default": "-1", + { "name": "CommittedLeaderEpoch", "type": "int32", "versions": "5-7", "default": "-1", "ignorable": true, "about": "The leader epoch." }, - { "name": "Metadata", "type": "string", "versions": "0+", "nullableVersions": "0+", + { "name": "Metadata", "type": "string", "versions": "0-7", "nullableVersions": "0-7", "about": "The partition metadata." }, - { "name": "ErrorCode", "type": "int16", "versions": "0+", + { "name": "ErrorCode", "type": "int16", "versions": "0-7", "about": "The error code, or 0 if there was no error." } ]} ]}, - { "name": "ErrorCode", "type": "int16", "versions": "2+", "default": "0", "ignorable": true, - "about": "The top-level error code, or 0 if there was no error." } + { "name": "ErrorCode", "type": "int16", "versions": "2-7", "default": "0", "ignorable": true, + "about": "The top-level error code, or 0 if there was no error." }, + {"name": "GroupIds", "type": "[]OffsetFetchResponseGroup", "versions": "8+", + "about": "The responses per group id.", "fields": [ + { "name": "groupId", "type": "string", "versions": "8+", "entityType": "groupId", + "about": "The group ID." }, + { "name": "Topics", "type": "[]OffsetFetchResponseTopics", "versions": "8+", + "about": "The responses per topic.", "fields": [ + { "name": "Name", "type": "string", "versions": "8+", "entityType": "topicName", + "about": "The topic name." }, + { "name": "Partitions", "type": "[]OffsetFetchResponsePartitions", "versions": "8+", + "about": "The responses per partition", "fields": [ + { "name": "PartitionIndex", "type": "int32", "versions": "8+", + "about": "The partition index." }, + { "name": "CommittedOffset", "type": "int64", "versions": "8+", + "about": "The committed message offset." }, + { "name": "CommittedLeaderEpoch", "type": "int32", "versions": "8+", "default": "-1", + "ignorable": true, "about": "The leader epoch." }, + { "name": "Metadata", "type": "string", "versions": "8+", "nullableVersions": "8+", + "about": "The partition metadata." }, + { "name": "ErrorCode", "type": "int16", "versions": "8+", + "about": "The partition-level error code, or 0 if there was no error." } + ]} + ]}, + { "name": "ErrorCode", "type": "int16", "versions": "8+", "default": "0", "ignorable": true, Review comment: Don't think this should be ignorable ########## File path: clients/src/test/java/org/apache/kafka/common/requests/OffsetFetchRequestTest.java ########## @@ -76,62 +73,169 @@ public void testConstructor() { } for (short version : ApiKeys.OFFSET_FETCH.allVersions()) { - OffsetFetchRequest request = builder.build(version); - assertFalse(request.isAllPartitions()); - assertEquals(groupId, request.groupId()); - assertEquals(partitions, request.partitions()); - - OffsetFetchResponse response = request.getErrorResponse(throttleTimeMs, Errors.NONE); - assertEquals(Errors.NONE, response.error()); - assertFalse(response.hasError()); - assertEquals(Collections.singletonMap(Errors.NONE, version <= (short) 1 ? 3 : 1), response.errorCounts(), - "Incorrect error count for version " + version); - - if (version <= 1) { - assertEquals(expectedData, response.responseData()); + if (version < 8) { + builder = new OffsetFetchRequest.Builder( + group1, + false, + partitions, + false); + assertFalse(builder.isAllTopicPartitions()); + OffsetFetchRequest request = builder.build(version); + assertFalse(request.isAllPartitions()); + assertEquals(group1, request.groupId()); + assertEquals(partitions, request.partitions()); + + OffsetFetchResponse response = request.getErrorResponse(throttleTimeMs, Errors.NONE); + assertEquals(Errors.NONE, response.error()); + assertFalse(response.hasError()); + assertEquals(Collections.singletonMap(Errors.NONE, version <= (short) 1 ? 3 : 1), response.errorCounts(), + "Incorrect error count for version " + version); + + if (version <= 1) { + assertEquals(expectedData, response.responseDataV0ToV7()); + } + + if (version >= 3) { + assertEquals(throttleTimeMs, response.throttleTimeMs()); + } else { + assertEquals(DEFAULT_THROTTLE_TIME, response.throttleTimeMs()); + } + } else { + builder = new Builder(Collections.singletonMap(group1, partitions), false, false); + OffsetFetchRequest request = builder.build(version); + Map<String, List<TopicPartition>> groupToPartitionMap = + request.groupIdsToPartitions(); + Map<String, List<OffsetFetchRequestTopics>> groupToTopicMap = + request.groupIdsToTopics(); + assertFalse(request.isAllPartitionsForGroup(group1)); + assertTrue(groupToPartitionMap.containsKey(group1) && groupToTopicMap.containsKey( + group1)); + assertEquals(partitions, groupToPartitionMap.get(group1)); + OffsetFetchResponse response = request.getErrorResponse(throttleTimeMs, Errors.NONE); + assertEquals(Errors.NONE, response.groupLevelError(group1)); + assertFalse(response.groupHasError(group1)); + assertEquals(Collections.singletonMap(Errors.NONE, 1), response.errorCounts(), + "Incorrect error count for version " + version); + assertEquals(throttleTimeMs, response.throttleTimeMs()); } + } + } + + @Test + public void testConstructorWithMultipleGroups() { + List<TopicPartition> topic1Partitions = Arrays.asList( + new TopicPartition(topicOne, partitionOne), + new TopicPartition(topicOne, partitionTwo)); + List<TopicPartition> topic2Partitions = Arrays.asList( + new TopicPartition(topicTwo, partitionOne), + new TopicPartition(topicTwo, partitionTwo)); + List<TopicPartition> topic3Partitions = Arrays.asList( + new TopicPartition(topicThree, partitionOne), + new TopicPartition(topicThree, partitionTwo)); + Map<String, List<TopicPartition>> groupToTp = new HashMap<>(); + groupToTp.put(group1, topic1Partitions); + groupToTp.put(group2, topic2Partitions); + groupToTp.put(group3, topic3Partitions); + groupToTp.put(group4, null); + groupToTp.put(group5, null); + int throttleTimeMs = 10; - if (version >= 3) { + for (short version : ApiKeys.OFFSET_FETCH.allVersions()) { + if (version >= 8) { + builder = new Builder(groupToTp, false, false); + OffsetFetchRequest request = builder.build(version); + Map<String, List<TopicPartition>> groupToPartitionMap = + request.groupIdsToPartitions(); + Map<String, List<OffsetFetchRequestTopics>> groupToTopicMap = + request.groupIdsToTopics(); + assertEquals(groupToTp.keySet(), groupToTopicMap.keySet()); + assertEquals(groupToTp.keySet(), groupToPartitionMap.keySet()); + assertFalse(request.isAllPartitionsForGroup(group1)); + assertFalse(request.isAllPartitionsForGroup(group2)); + assertFalse(request.isAllPartitionsForGroup(group3)); + assertTrue(request.isAllPartitionsForGroup(group4)); + assertTrue(request.isAllPartitionsForGroup(group5)); + OffsetFetchResponse response = request.getErrorResponse(throttleTimeMs, Errors.NONE); + assertEquals(Errors.NONE, response.groupLevelError(group1)); + assertEquals(Errors.NONE, response.groupLevelError(group2)); + assertEquals(Errors.NONE, response.groupLevelError(group3)); + assertEquals(Errors.NONE, response.groupLevelError(group4)); + assertEquals(Errors.NONE, response.groupLevelError(group5)); + assertFalse(response.groupHasError(group1)); + assertFalse(response.groupHasError(group2)); + assertFalse(response.groupHasError(group3)); + assertFalse(response.groupHasError(group4)); + assertFalse(response.groupHasError(group5)); + assertEquals(Collections.singletonMap(Errors.NONE, 5), response.errorCounts(), + "Incorrect error count for version " + version); assertEquals(throttleTimeMs, response.throttleTimeMs()); - } else { - assertEquals(DEFAULT_THROTTLE_TIME, response.throttleTimeMs()); } } } @Test - public void testConstructorFailForUnsupportedRequireStable() { + public void testBuildThrowForUnsupportedBatchRequest() { for (short version : ApiKeys.OFFSET_FETCH.allVersions()) { - // The builder needs to be initialized every cycle as the internal data `requireStable` flag is flipped. - builder = new OffsetFetchRequest.Builder(groupId, true, null, false); - final short finalVersion = version; - if (version < 2) { - assertThrows(UnsupportedVersionException.class, () -> builder.build(finalVersion)); - } else { - OffsetFetchRequest request = builder.build(finalVersion); - assertEquals(groupId, request.groupId()); - assertNull(request.partitions()); - assertTrue(request.isAllPartitions()); - if (version < 7) { - assertFalse(request.requireStable()); - } else { - assertTrue(request.requireStable()); - } + if (version < 8) { + Map<String, List<TopicPartition>> groupPartitionMap = new HashMap<>(); + groupPartitionMap.put(group1, null); + groupPartitionMap.put(group2, null); + builder = new Builder(groupPartitionMap, true, false); + final short finalVersion = version; + assertThrows(NoBatchedOffsetFetchRequestException.class, () -> builder.build(finalVersion)); Review comment: As before, it will be good to include test for `version >=8` as well ########## File path: core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala ########## @@ -1358,17 +1367,241 @@ class AuthorizerIntegrationTest extends BaseRequestTest { // note there's only one broker, so no need to lookup the group coordinator // without describe permission on the topic, we shouldn't be able to fetch offsets - val offsetFetchRequest = new requests.OffsetFetchRequest.Builder(group, false, null, false).build() + val offsetFetchRequest = createOffsetFetchRequestAllPartitions var offsetFetchResponse = connectAndReceive[OffsetFetchResponse](offsetFetchRequest) - assertEquals(Errors.NONE, offsetFetchResponse.error) - assertTrue(offsetFetchResponse.responseData.isEmpty) + assertEquals(Errors.NONE, offsetFetchResponse.groupLevelError(group)) + assertTrue(offsetFetchResponse.responseData(group).isEmpty) // now add describe permission on the topic and verify that the offset can be fetched addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, DESCRIBE, ALLOW)), topicResource) offsetFetchResponse = connectAndReceive[OffsetFetchResponse](offsetFetchRequest) - assertEquals(Errors.NONE, offsetFetchResponse.error) - assertTrue(offsetFetchResponse.responseData.containsKey(tp)) - assertEquals(offset, offsetFetchResponse.responseData.get(tp).offset) + assertEquals(Errors.NONE, offsetFetchResponse.groupLevelError(group)) + assertTrue(offsetFetchResponse.responseData(group).containsKey(tp)) + assertEquals(offset, offsetFetchResponse.responseData(group).get(tp).offset) + } + + @Test + def testOffsetFetchMultipleGroupsAuthorization(): Unit = { + val groupOne = "group1" + val groupOneResource = new ResourcePattern(GROUP, groupOne, LITERAL) + val groupTwo = "group2" + val groupTwoResource = new ResourcePattern(GROUP, groupTwo, LITERAL) + val groupThree = "group3" + val groupThreeResource = new ResourcePattern(GROUP, groupThree, LITERAL) + val groupFour = "group4" + val groupFourResource = new ResourcePattern(GROUP, groupFour, LITERAL) + val groupFive = "group5" + val groupFiveResource = new ResourcePattern(GROUP, groupFive, LITERAL) + + val topic1 = "topic1" + val topic1List = singletonList(new TopicPartition(topic1, 0)) + val topicOneResource = new ResourcePattern(TOPIC, topic1, LITERAL) + val topic2 = "topic2" + val topic1And2List = util.Arrays.asList( + new TopicPartition(topic1, 0), + new TopicPartition(topic2, 0), + new TopicPartition(topic2, 1)) + val topicTwoResource = new ResourcePattern(TOPIC, topic2, LITERAL) + val topic3 = "topic3" + val allTopicsList = util.Arrays.asList( + new TopicPartition(topic1, 0), + new TopicPartition(topic2, 0), + new TopicPartition(topic2, 1), + new TopicPartition(topic3, 0), + new TopicPartition(topic3, 1), + new TopicPartition(topic3, 2)) + val topicThreeResource = new ResourcePattern(TOPIC, topic3, LITERAL) + + // create group to partition map to build batched offsetFetch request + val groupToPartitionMap = new util.HashMap[String, util.List[TopicPartition]]() + groupToPartitionMap.put(groupOne, topic1List) + groupToPartitionMap.put(groupTwo, topic1And2List) + groupToPartitionMap.put(groupThree, allTopicsList) + groupToPartitionMap.put(groupFour, null) + groupToPartitionMap.put(groupFive, null) + + createTopic(topic1) + createTopic(topic2, numPartitions = 2) + createTopic(topic3, numPartitions = 3) + + addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, READ, ALLOW)), groupOneResource) + addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, READ, ALLOW)), groupTwoResource) + addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, READ, ALLOW)), groupThreeResource) + addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, READ, ALLOW)), groupFourResource) + addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, READ, ALLOW)), groupFiveResource) + addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, READ, ALLOW)), topicOneResource) + addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, READ, ALLOW)), topicTwoResource) + addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, READ, ALLOW)), topicThreeResource) + + val offset = 15L + val leaderEpoch: Optional[Integer] = Optional.of(1) + val metadata = "metadata" + val topicOneOffsets = topic1List.asScala.map{ + tp => (tp, new OffsetAndMetadata(offset, leaderEpoch, metadata)) + }.toMap.asJava + val topicOneAndTwoOffsets = topic1And2List.asScala.map{ + tp => (tp, new OffsetAndMetadata(offset, leaderEpoch, metadata)) + }.toMap.asJava + val allTopicOffsets = allTopicsList.asScala.map{ + tp => (tp, new OffsetAndMetadata(offset, leaderEpoch, metadata)) + }.toMap.asJava + + // create 5 consumers to commit offsets so we can fetch them later + + consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupOne) + var consumer = createConsumer() + consumer.assign(topic1List) + consumer.commitSync(topicOneOffsets) + consumer.close() Review comment: Create a nested method to do this and call it for all cases. ########## File path: clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java ########## @@ -1580,21 +1623,83 @@ private OffsetCommitResponse createOffsetCommitResponse() { } private OffsetFetchRequest createOffsetFetchRequest(int version, boolean requireStable) { - return new OffsetFetchRequest.Builder("group1", requireStable, Collections.singletonList(new TopicPartition("test11", 1)), false) + if (version < 8) { + return new OffsetFetchRequest.Builder( + "group1", + requireStable, + Collections.singletonList( + new TopicPartition("test11", 1)), Review comment: nit: unnecessary newline ########## File path: core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala ########## @@ -0,0 +1,227 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import kafka.utils.TestUtils +import org.apache.kafka.clients.consumer.{ConsumerConfig, OffsetAndMetadata} +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.protocol.{ApiKeys, Errors} +import org.apache.kafka.common.requests.{AbstractResponse, OffsetFetchRequest, OffsetFetchResponse} +import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} +import org.junit.jupiter.api.{BeforeEach, Test} + +import java.util +import java.util.Collections.singletonList +import scala.jdk.CollectionConverters._ +import java.util.{Optional, Properties} + +class OffsetFetchRequestTest extends BaseRequestTest{ + + override def brokerCount: Int = 1 + + val brokerId: Integer = 0 + val offset = 15L + val leaderEpoch: Optional[Integer] = Optional.of(3) + val metadata = "metadata" + + override def brokerPropertyOverrides(properties: Properties): Unit = { + properties.put(KafkaConfig.BrokerIdProp, brokerId.toString) + properties.put(KafkaConfig.OffsetsTopicPartitionsProp, "1") + properties.put(KafkaConfig.OffsetsTopicReplicationFactorProp, "1") + properties.put(KafkaConfig.TransactionsTopicPartitionsProp, "1") + properties.put(KafkaConfig.TransactionsTopicReplicationFactorProp, "1") + properties.put(KafkaConfig.TransactionsTopicMinISRProp, "1") + } + + @BeforeEach + override def setUp(): Unit = { + doSetup(createOffsetsTopic = false) + + TestUtils.createOffsetsTopic(zkClient, servers) + } + + @Test + def testOffsetFetchRequestLessThanV8(): Unit = { + val topic = "topic" + createTopic(topic) + + val groupId = "groupId" + val tpList = singletonList(new TopicPartition(topic, 0)) + val topicOffsets = tpList.asScala.map{ + tp => (tp, new OffsetAndMetadata(offset, leaderEpoch, metadata)) + }.toMap.asJava + + consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId) + val consumer = createConsumer() + consumer.assign(tpList) + consumer.commitSync(topicOffsets) + consumer.close() + // testing from version 1 onward since version 0 read offsets from ZK + for (version <- 1 to ApiKeys.OFFSET_FETCH.latestVersion()) { + if (version < 8) { + val request = + if (version < 7) { + new OffsetFetchRequest.Builder( + groupId, false, tpList, false) + .build(version.asInstanceOf[Short]) + } else { + new OffsetFetchRequest.Builder( + groupId, false, tpList, true) + .build(version.asInstanceOf[Short]) + } + val response = connectAndReceive[OffsetFetchResponse](request) + val topicData = response.data().topics().get(0) + val partitionData = topicData.partitions().get(0) + if (version < 3) { + assertEquals(AbstractResponse.DEFAULT_THROTTLE_TIME, response.throttleTimeMs()) + } + assertEquals(Errors.NONE, response.error()) + assertEquals(topic, topicData.name()) + assertEquals(0, partitionData.partitionIndex()) + assertEquals(offset, partitionData.committedOffset()) + if (version >= 5) { + // committed leader epoch introduced with V5 + assertEquals(leaderEpoch.get(), partitionData.committedLeaderEpoch()) + } + assertEquals(metadata, partitionData.metadata()) + assertEquals(Errors.NONE.code(), partitionData.errorCode()) + } + } + } + + @Test + def testOffsetFetchRequestV8AndAbove(): Unit = { + val groupOne = "group1" + val groupTwo = "group2" + val groupThree = "group3" + val groupFour = "group4" + val groupFive = "group5" Review comment: If we move the single group case to the previous test, we can make this one work on a collection of groups. Instead of named groups like `groupOne`, we can just use group(1). ########## File path: core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala ########## @@ -0,0 +1,227 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import kafka.utils.TestUtils +import org.apache.kafka.clients.consumer.{ConsumerConfig, OffsetAndMetadata} +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.protocol.{ApiKeys, Errors} +import org.apache.kafka.common.requests.{AbstractResponse, OffsetFetchRequest, OffsetFetchResponse} +import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} +import org.junit.jupiter.api.{BeforeEach, Test} + +import java.util +import java.util.Collections.singletonList +import scala.jdk.CollectionConverters._ +import java.util.{Optional, Properties} + +class OffsetFetchRequestTest extends BaseRequestTest{ + + override def brokerCount: Int = 1 + + val brokerId: Integer = 0 + val offset = 15L + val leaderEpoch: Optional[Integer] = Optional.of(3) + val metadata = "metadata" + + override def brokerPropertyOverrides(properties: Properties): Unit = { + properties.put(KafkaConfig.BrokerIdProp, brokerId.toString) + properties.put(KafkaConfig.OffsetsTopicPartitionsProp, "1") + properties.put(KafkaConfig.OffsetsTopicReplicationFactorProp, "1") + properties.put(KafkaConfig.TransactionsTopicPartitionsProp, "1") + properties.put(KafkaConfig.TransactionsTopicReplicationFactorProp, "1") + properties.put(KafkaConfig.TransactionsTopicMinISRProp, "1") + } + + @BeforeEach + override def setUp(): Unit = { + doSetup(createOffsetsTopic = false) + + TestUtils.createOffsetsTopic(zkClient, servers) + } + + @Test + def testOffsetFetchRequestLessThanV8(): Unit = { Review comment: Can we make this for all versions with a single group id? That is the scenario in the consumer and we want to make sure the methods in request/response classes handle any version for that case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org