tombentley commented on a change in pull request #10743: URL: https://github.com/apache/kafka/pull/10743#discussion_r654335077
########## File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java ########## @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin.internals; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.message.OffsetCommitRequestData; +import org.apache.kafka.common.message.OffsetCommitRequestData.OffsetCommitRequestPartition; +import org.apache.kafka.common.message.OffsetCommitRequestData.OffsetCommitRequestTopic; +import org.apache.kafka.common.message.OffsetCommitResponseData.OffsetCommitResponsePartition; +import org.apache.kafka.common.message.OffsetCommitResponseData.OffsetCommitResponseTopic; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.AbstractResponse; +import org.apache.kafka.common.requests.OffsetCommitRequest; +import org.apache.kafka.common.requests.OffsetCommitResponse; +import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType; +import org.apache.kafka.common.utils.LogContext; +import org.slf4j.Logger; + +public class AlterConsumerGroupOffsetsHandler implements AdminApiHandler<CoordinatorKey, Map<TopicPartition, Errors>> { + + private final CoordinatorKey groupId; + private final Map<TopicPartition, OffsetAndMetadata> offsets; + private final Logger log; + private final AdminApiLookupStrategy<CoordinatorKey> lookupStrategy; + + public AlterConsumerGroupOffsetsHandler( + String groupId, + Map<TopicPartition, OffsetAndMetadata> offsets, + LogContext logContext + ) { + this.groupId = CoordinatorKey.byGroupId(groupId); + this.offsets = offsets; + this.log = logContext.logger(AlterConsumerGroupOffsetsHandler.class); + this.lookupStrategy = new CoordinatorStrategy(CoordinatorType.GROUP, logContext); + } + + @Override + public String apiName() { + return "offsetCommit"; + } + + @Override + public AdminApiLookupStrategy<CoordinatorKey> lookupStrategy() { + return lookupStrategy; + } + + public static AdminApiFuture.SimpleAdminApiFuture<CoordinatorKey, Map<TopicPartition, Errors>> newFuture( + String groupId + ) { + return AdminApiFuture.forKeys(Collections.singleton(CoordinatorKey.byGroupId(groupId))); + } + + @Override + public OffsetCommitRequest.Builder buildRequest(int brokerId, Set<CoordinatorKey> keys) { Review comment: brokerId -> coordinatorId, to emphasize the point that we've found the coordinators at this point? ########## File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java ########## @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin.internals; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.message.OffsetCommitRequestData; +import org.apache.kafka.common.message.OffsetCommitRequestData.OffsetCommitRequestPartition; +import org.apache.kafka.common.message.OffsetCommitRequestData.OffsetCommitRequestTopic; +import org.apache.kafka.common.message.OffsetCommitResponseData.OffsetCommitResponsePartition; +import org.apache.kafka.common.message.OffsetCommitResponseData.OffsetCommitResponseTopic; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.AbstractResponse; +import org.apache.kafka.common.requests.OffsetCommitRequest; +import org.apache.kafka.common.requests.OffsetCommitResponse; +import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType; +import org.apache.kafka.common.utils.LogContext; +import org.slf4j.Logger; + +public class AlterConsumerGroupOffsetsHandler implements AdminApiHandler<CoordinatorKey, Map<TopicPartition, Errors>> { Review comment: Perhaps it's worth a line of doc to explain: "Sends the OffsetCommitRequest to each group coordinator based on the discovered coordinators for the relevant groups, and handles the results." ########## File path: clients/src/main/java/org/apache/kafka/clients/admin/DeleteConsumerGroupOffsetsResult.java ########## @@ -32,11 +32,11 @@ */ @InterfaceStability.Evolving public class DeleteConsumerGroupOffsetsResult { - private final KafkaFuture<Map<TopicPartition, Errors>> future; + private final KafkaFutureImpl<Map<TopicPartition, Errors>> future; private final Set<TopicPartition> partitions; - DeleteConsumerGroupOffsetsResult(KafkaFuture<Map<TopicPartition, Errors>> future, Set<TopicPartition> partitions) { + DeleteConsumerGroupOffsetsResult(KafkaFutureImpl<Map<TopicPartition, Errors>> future, Set<TopicPartition> partitions) { Review comment: I don't think these changes are needed now. ########## File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandler.java ########## @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin.internals; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.message.OffsetDeleteRequestData; +import org.apache.kafka.common.message.OffsetDeleteRequestData.OffsetDeleteRequestPartition; +import org.apache.kafka.common.message.OffsetDeleteRequestData.OffsetDeleteRequestTopic; +import org.apache.kafka.common.message.OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.AbstractResponse; +import org.apache.kafka.common.requests.OffsetDeleteRequest; +import org.apache.kafka.common.requests.OffsetDeleteResponse; +import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType; +import org.apache.kafka.common.utils.LogContext; +import org.slf4j.Logger; + +public class DeleteConsumerGroupOffsetsHandler implements AdminApiHandler<CoordinatorKey, Map<TopicPartition, Errors>> { Review comment: To save me repeating what I said before about a line of Javadoc and param name -> coordinator, on this and the other AdminApiHandler subclasses? ########## File path: clients/src/test/java/org/apache/kafka/clients/MockClient.java ########## @@ -245,10 +246,17 @@ public void send(ClientRequest request, long now) { unsupportedVersionException = new UnsupportedVersionException( "Api " + request.apiKey() + " with version " + version); } else { - AbstractRequest abstractRequest = request.requestBuilder().build(version); - if (!futureResp.requestMatcher.matches(abstractRequest)) - throw new IllegalStateException("Request matcher did not match next-in-line request " - + abstractRequest + " with prepared response " + futureResp.responseBody); + try { + AbstractRequest abstractRequest = request.requestBuilder().build(version); + if (!futureResp.requestMatcher.matches(abstractRequest)) + continue; Review comment: Is it right that we've lost the ISE entirely here? ########## File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandler.java ########## @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin.internals; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.message.OffsetDeleteRequestData; +import org.apache.kafka.common.message.OffsetDeleteRequestData.OffsetDeleteRequestPartition; +import org.apache.kafka.common.message.OffsetDeleteRequestData.OffsetDeleteRequestTopic; +import org.apache.kafka.common.message.OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.AbstractResponse; +import org.apache.kafka.common.requests.OffsetDeleteRequest; +import org.apache.kafka.common.requests.OffsetDeleteResponse; +import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType; +import org.apache.kafka.common.utils.LogContext; +import org.slf4j.Logger; + +public class DeleteConsumerGroupOffsetsHandler implements AdminApiHandler<CoordinatorKey, Map<TopicPartition, Errors>> { + + private final CoordinatorKey groupId; + private final Set<TopicPartition> partitions; + private final Logger log; + private final AdminApiLookupStrategy<CoordinatorKey> lookupStrategy; + + public DeleteConsumerGroupOffsetsHandler( + String groupId, + Set<TopicPartition> partitions, + LogContext logContext + ) { + this.groupId = CoordinatorKey.byGroupId(groupId); + this.partitions = partitions; + this.log = logContext.logger(DeleteConsumerGroupOffsetsHandler.class); + this.lookupStrategy = new CoordinatorStrategy(CoordinatorType.GROUP, logContext); + } + + @Override + public String apiName() { + return "offsetDelete"; + } + + @Override + public AdminApiLookupStrategy<CoordinatorKey> lookupStrategy() { + return lookupStrategy; + } + + public static AdminApiFuture.SimpleAdminApiFuture<CoordinatorKey, Map<TopicPartition, Errors>> newFuture( + String groupId + ) { + return AdminApiFuture.forKeys(Collections.singleton(CoordinatorKey.byGroupId(groupId))); + } + + @Override + public OffsetDeleteRequest.Builder buildRequest(int brokerId, Set<CoordinatorKey> keys) { + final OffsetDeleteRequestTopicCollection topics = new OffsetDeleteRequestTopicCollection(); + partitions.stream().collect(Collectors.groupingBy(TopicPartition::topic)).forEach((topic, topicPartitions) -> topics.add( + new OffsetDeleteRequestTopic() + .setName(topic) + .setPartitions(topicPartitions.stream() + .map(tp -> new OffsetDeleteRequestPartition().setPartitionIndex(tp.partition())) + .collect(Collectors.toList()) + ) + )); + + return new OffsetDeleteRequest.Builder( + new OffsetDeleteRequestData() + .setGroupId(groupId.idValue) + .setTopics(topics) + ); + } + + @Override + public ApiResult<CoordinatorKey, Map<TopicPartition, Errors>> handleResponse(Node broker, Set<CoordinatorKey> groupIds, + AbstractResponse abstractResponse) { + + final OffsetDeleteResponse response = (OffsetDeleteResponse) abstractResponse; + Map<CoordinatorKey, Map<TopicPartition, Errors>> completed = new HashMap<>(); + Map<CoordinatorKey, Throwable> failed = new HashMap<>(); + List<CoordinatorKey> unmapped = new ArrayList<>(); + + final Errors error = Errors.forCode(response.data().errorCode()); + if (error != Errors.NONE) { + handleError(groupId, error, failed, unmapped); + } else { + final Map<TopicPartition, Errors> partitions = new HashMap<>(); + response.data().topics().forEach(topic -> + topic.partitions().forEach(partition -> { + Errors partitionError = Errors.forCode(partition.errorCode()); + boolean hasError = handleError(groupId, partitionError, failed, unmapped); Review comment: inline? ########## File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java ########## @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin.internals; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.message.OffsetCommitRequestData; +import org.apache.kafka.common.message.OffsetCommitRequestData.OffsetCommitRequestPartition; +import org.apache.kafka.common.message.OffsetCommitRequestData.OffsetCommitRequestTopic; +import org.apache.kafka.common.message.OffsetCommitResponseData.OffsetCommitResponsePartition; +import org.apache.kafka.common.message.OffsetCommitResponseData.OffsetCommitResponseTopic; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.AbstractResponse; +import org.apache.kafka.common.requests.OffsetCommitRequest; +import org.apache.kafka.common.requests.OffsetCommitResponse; +import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType; +import org.apache.kafka.common.utils.LogContext; +import org.slf4j.Logger; + +public class AlterConsumerGroupOffsetsHandler implements AdminApiHandler<CoordinatorKey, Map<TopicPartition, Errors>> { + + private final CoordinatorKey groupId; + private final Map<TopicPartition, OffsetAndMetadata> offsets; + private final Logger log; + private final AdminApiLookupStrategy<CoordinatorKey> lookupStrategy; + + public AlterConsumerGroupOffsetsHandler( + String groupId, + Map<TopicPartition, OffsetAndMetadata> offsets, + LogContext logContext + ) { + this.groupId = CoordinatorKey.byGroupId(groupId); + this.offsets = offsets; + this.log = logContext.logger(AlterConsumerGroupOffsetsHandler.class); + this.lookupStrategy = new CoordinatorStrategy(CoordinatorType.GROUP, logContext); + } + + @Override + public String apiName() { + return "offsetCommit"; + } + + @Override + public AdminApiLookupStrategy<CoordinatorKey> lookupStrategy() { + return lookupStrategy; + } + + public static AdminApiFuture.SimpleAdminApiFuture<CoordinatorKey, Map<TopicPartition, Errors>> newFuture( + String groupId + ) { + return AdminApiFuture.forKeys(Collections.singleton(CoordinatorKey.byGroupId(groupId))); + } + + @Override + public OffsetCommitRequest.Builder buildRequest(int brokerId, Set<CoordinatorKey> keys) { + List<OffsetCommitRequestTopic> topics = new ArrayList<>(); + Map<String, List<OffsetCommitRequestPartition>> offsetData = new HashMap<>(); + for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) { + String topic = entry.getKey().topic(); + OffsetAndMetadata oam = entry.getValue(); + offsetData.compute(topic, (key, value) -> { + if (value == null) { + value = new ArrayList<>(); + } + OffsetCommitRequestPartition partition = new OffsetCommitRequestPartition() + .setCommittedOffset(oam.offset()) + .setCommittedLeaderEpoch(oam.leaderEpoch().orElse(-1)) + .setCommittedMetadata(oam.metadata()) + .setPartitionIndex(entry.getKey().partition()); + value.add(partition); + return value; + }); Review comment: Wouldn't this be clearer as a computeIfAbsent()? ```suggestion OffsetCommitRequestPartition partition = new OffsetCommitRequestPartition() .setCommittedOffset(oam.offset()) .setCommittedLeaderEpoch(oam.leaderEpoch().orElse(-1)) .setCommittedMetadata(oam.metadata()) .setPartitionIndex(entry.getKey().partition()); offsetData.computeIfAbsent(topic, key -> new ArrayList<>()).add(partition); ``` ########## File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java ########## @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin.internals; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.message.OffsetCommitRequestData; +import org.apache.kafka.common.message.OffsetCommitRequestData.OffsetCommitRequestPartition; +import org.apache.kafka.common.message.OffsetCommitRequestData.OffsetCommitRequestTopic; +import org.apache.kafka.common.message.OffsetCommitResponseData.OffsetCommitResponsePartition; +import org.apache.kafka.common.message.OffsetCommitResponseData.OffsetCommitResponseTopic; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.AbstractResponse; +import org.apache.kafka.common.requests.OffsetCommitRequest; +import org.apache.kafka.common.requests.OffsetCommitResponse; +import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType; +import org.apache.kafka.common.utils.LogContext; +import org.slf4j.Logger; + +public class AlterConsumerGroupOffsetsHandler implements AdminApiHandler<CoordinatorKey, Map<TopicPartition, Errors>> { + + private final CoordinatorKey groupId; + private final Map<TopicPartition, OffsetAndMetadata> offsets; + private final Logger log; + private final AdminApiLookupStrategy<CoordinatorKey> lookupStrategy; + + public AlterConsumerGroupOffsetsHandler( + String groupId, + Map<TopicPartition, OffsetAndMetadata> offsets, + LogContext logContext + ) { + this.groupId = CoordinatorKey.byGroupId(groupId); + this.offsets = offsets; + this.log = logContext.logger(AlterConsumerGroupOffsetsHandler.class); + this.lookupStrategy = new CoordinatorStrategy(CoordinatorType.GROUP, logContext); + } + + @Override + public String apiName() { + return "offsetCommit"; + } + + @Override + public AdminApiLookupStrategy<CoordinatorKey> lookupStrategy() { + return lookupStrategy; + } + + public static AdminApiFuture.SimpleAdminApiFuture<CoordinatorKey, Map<TopicPartition, Errors>> newFuture( + String groupId + ) { + return AdminApiFuture.forKeys(Collections.singleton(CoordinatorKey.byGroupId(groupId))); + } + + @Override + public OffsetCommitRequest.Builder buildRequest(int brokerId, Set<CoordinatorKey> keys) { + List<OffsetCommitRequestTopic> topics = new ArrayList<>(); + Map<String, List<OffsetCommitRequestPartition>> offsetData = new HashMap<>(); + for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) { + String topic = entry.getKey().topic(); + OffsetAndMetadata oam = entry.getValue(); + offsetData.compute(topic, (key, value) -> { + if (value == null) { + value = new ArrayList<>(); + } + OffsetCommitRequestPartition partition = new OffsetCommitRequestPartition() + .setCommittedOffset(oam.offset()) + .setCommittedLeaderEpoch(oam.leaderEpoch().orElse(-1)) + .setCommittedMetadata(oam.metadata()) + .setPartitionIndex(entry.getKey().partition()); + value.add(partition); + return value; + }); + } + for (Map.Entry<String, List<OffsetCommitRequestPartition>> entry : offsetData.entrySet()) { + OffsetCommitRequestTopic topic = new OffsetCommitRequestTopic() + .setName(entry.getKey()) + .setPartitions(entry.getValue()); + topics.add(topic); + } + OffsetCommitRequestData data = new OffsetCommitRequestData() + .setGroupId(groupId.idValue) + .setTopics(topics); + return new OffsetCommitRequest.Builder(data); + } + + @Override + public ApiResult<CoordinatorKey, Map<TopicPartition, Errors>> handleResponse(Node broker, Set<CoordinatorKey> groupIds, Review comment: Again, wdyt of `broker` -> `coordinator`? ########## File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java ########## @@ -1524,11 +1534,29 @@ String coordinatorKey() { @Override public void handleResponse(AbstractResponse response) { FindCoordinatorResponse findCoordinatorResponse = (FindCoordinatorResponse) response; - Errors error = findCoordinatorResponse.error(); CoordinatorType coordinatorType = CoordinatorType.forId(builder.data().keyType()); + if (batchFindCoordinator && findCoordinatorResponse.data().coordinators().size() != 1) { + log.error("Group coordinator lookup failed: Invalid response containing more than a single coordinator"); + fatalError(new IllegalStateException("Group coordinator lookup failed: Invalid response containing more than a single coordinator")); + } + String key = batchFindCoordinator + ? findCoordinatorResponse.data().coordinators().get(0).key() Review comment: Similarly a var for `findCoordinatorResponse.data().coordinators().get(0)` ########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ########## @@ -813,34 +815,56 @@ public void handle(SyncGroupResponse syncResponse, */ private RequestFuture<Void> sendFindCoordinatorRequest(Node node) { // initiate the group metadata request - log.debug("Sending FindCoordinator request to broker {}", node); - FindCoordinatorRequest.Builder requestBuilder = - new FindCoordinatorRequest.Builder( - new FindCoordinatorRequestData() - .setKeyType(CoordinatorType.GROUP.id()) - .setKey(this.rebalanceConfig.groupId)); + log.debug("Sending FindCoordinator request to broker {} with batch={}", node, batchFindCoordinator); + FindCoordinatorRequestData data = new FindCoordinatorRequestData() + .setKeyType(CoordinatorType.GROUP.id()); + if (batchFindCoordinator) { + data.setCoordinatorKeys(Collections.singletonList(this.rebalanceConfig.groupId)); + } else { + data.setKey(this.rebalanceConfig.groupId); + } + FindCoordinatorRequest.Builder requestBuilder = new FindCoordinatorRequest.Builder(data); return client.send(node, requestBuilder) - .compose(new FindCoordinatorResponseHandler()); + .compose(new FindCoordinatorResponseHandler(batchFindCoordinator)); } private class FindCoordinatorResponseHandler extends RequestFutureAdapter<ClientResponse, Void> { + private boolean batch; + FindCoordinatorResponseHandler(boolean batch) { + this.batch = batch; + } @Override public void onSuccess(ClientResponse resp, RequestFuture<Void> future) { log.debug("Received FindCoordinator response {}", resp); FindCoordinatorResponse findCoordinatorResponse = (FindCoordinatorResponse) resp.responseBody(); - Errors error = findCoordinatorResponse.error(); + if (batch && findCoordinatorResponse.data().coordinators().size() != 1) { + log.error("Group coordinator lookup failed: Invalid response containing more than a single coordinator"); + future.raise(new IllegalStateException("Group coordinator lookup failed: Invalid response containing more than a single coordinator")); + } + Errors error = batch + ? Errors.forCode(findCoordinatorResponse.data().coordinators().get(0).errorCode()) + : findCoordinatorResponse.error(); if (error == Errors.NONE) { synchronized (AbstractCoordinator.this) { + int nodeId = batch + ? findCoordinatorResponse.data().coordinators().get(0).nodeId() Review comment: Factor out var for `findCoordinatorResponse.data().coordinators().get(0)`? ########## File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiHandler.java ########## @@ -57,13 +58,13 @@ * Note that keys which received a retriable error should be left out of the * result. They will be retried automatically. * - * @param brokerId the brokerId that the associated request was sent to + * @param broker the broker that the associated request was sent to Review comment: Is it necessarily a broker, or could it be a kraft controller? ########## File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupsHandler.java ########## @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin.internals; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import org.apache.kafka.common.Node; +import org.apache.kafka.common.message.DeleteGroupsRequestData; +import org.apache.kafka.common.message.DeleteGroupsResponseData.DeletableGroupResult; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.AbstractResponse; +import org.apache.kafka.common.requests.DeleteGroupsRequest; +import org.apache.kafka.common.requests.DeleteGroupsResponse; +import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType; +import org.apache.kafka.common.utils.LogContext; +import org.slf4j.Logger; + +public class DeleteConsumerGroupsHandler implements AdminApiHandler<CoordinatorKey, Void> { + + private final Logger log; + private final AdminApiLookupStrategy<CoordinatorKey> lookupStrategy; + + public DeleteConsumerGroupsHandler( + Set<String> groupIds, + LogContext logContext + ) { + this.log = logContext.logger(DeleteConsumerGroupsHandler.class); + this.lookupStrategy = new CoordinatorStrategy(CoordinatorType.GROUP, logContext); + } + + @Override + public String apiName() { + return "deleteConsumerGroups"; + } + + @Override + public AdminApiLookupStrategy<CoordinatorKey> lookupStrategy() { + return lookupStrategy; + } + + public static AdminApiFuture.SimpleAdminApiFuture<CoordinatorKey, Void> newFuture( + Collection<String> groupIds + ) { + return AdminApiFuture.forKeys(buildKeySet(groupIds)); + } + + private static Set<CoordinatorKey> buildKeySet(Collection<String> groupIds) { + return groupIds.stream() + .map(CoordinatorKey::byGroupId) + .collect(Collectors.toSet()); + } + + @Override + public DeleteGroupsRequest.Builder buildRequest(int brokerId, Set<CoordinatorKey> keys) { + List<String> groupIds = keys.stream().map(key -> key.idValue).collect(Collectors.toList()); + DeleteGroupsRequestData data = new DeleteGroupsRequestData() + .setGroupsNames(new ArrayList<>(groupIds)); Review comment: Is the copy needed? ########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ########## @@ -813,34 +815,56 @@ public void handle(SyncGroupResponse syncResponse, */ private RequestFuture<Void> sendFindCoordinatorRequest(Node node) { // initiate the group metadata request - log.debug("Sending FindCoordinator request to broker {}", node); - FindCoordinatorRequest.Builder requestBuilder = - new FindCoordinatorRequest.Builder( - new FindCoordinatorRequestData() - .setKeyType(CoordinatorType.GROUP.id()) - .setKey(this.rebalanceConfig.groupId)); + log.debug("Sending FindCoordinator request to broker {} with batch={}", node, batchFindCoordinator); + FindCoordinatorRequestData data = new FindCoordinatorRequestData() + .setKeyType(CoordinatorType.GROUP.id()); + if (batchFindCoordinator) { + data.setCoordinatorKeys(Collections.singletonList(this.rebalanceConfig.groupId)); + } else { + data.setKey(this.rebalanceConfig.groupId); + } + FindCoordinatorRequest.Builder requestBuilder = new FindCoordinatorRequest.Builder(data); return client.send(node, requestBuilder) - .compose(new FindCoordinatorResponseHandler()); + .compose(new FindCoordinatorResponseHandler(batchFindCoordinator)); } private class FindCoordinatorResponseHandler extends RequestFutureAdapter<ClientResponse, Void> { + private boolean batch; + FindCoordinatorResponseHandler(boolean batch) { + this.batch = batch; + } @Override public void onSuccess(ClientResponse resp, RequestFuture<Void> future) { log.debug("Received FindCoordinator response {}", resp); FindCoordinatorResponse findCoordinatorResponse = (FindCoordinatorResponse) resp.responseBody(); - Errors error = findCoordinatorResponse.error(); + if (batch && findCoordinatorResponse.data().coordinators().size() != 1) { + log.error("Group coordinator lookup failed: Invalid response containing more than a single coordinator"); + future.raise(new IllegalStateException("Group coordinator lookup failed: Invalid response containing more than a single coordinator")); + } + Errors error = batch + ? Errors.forCode(findCoordinatorResponse.data().coordinators().get(0).errorCode()) + : findCoordinatorResponse.error(); if (error == Errors.NONE) { synchronized (AbstractCoordinator.this) { + int nodeId = batch + ? findCoordinatorResponse.data().coordinators().get(0).nodeId() Review comment: Sorriy, I think I made an erroneous point like this before. It sucks that there's no good abstraction across the common parts of `FindCoordinatorResponseData.Coordinator` and `FindCoordinatorResponseData`. ########## File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiHandler.java ########## @@ -57,13 +58,13 @@ * Note that keys which received a retriable error should be left out of the * result. They will be retried automatically. * - * @param brokerId the brokerId that the associated request was sent to + * @param broker the broker that the associated request was sent to Review comment: Well `process.roles` is documented like this in `config/kraft/README.md`: * If `process.roles` is set to `broker`, the server acts as a broker in KRaft mode. * If `process.roles` is set to `controller`, the server acts as a controller in KRaft mode. * If `process.roles` is set to `broker,controller`, the server acts as both a broker and a controller in KRaft mode. which suggests to me that we're using the term "broker" to mean "a thing which handles Produce and Fetch etc". (However, "controller" is a bit confusing there, since while we might have several servers in the "controller" role only one will be _the_ controller at any one time. I'm not aware of a good term for "server that is participating in the raft cluster, but might not be the current controller right now"). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org