tombentley commented on a change in pull request #10743:
URL: https://github.com/apache/kafka/pull/10743#discussion_r654335077



##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin.internals;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.OffsetCommitRequestData;
+import 
org.apache.kafka.common.message.OffsetCommitRequestData.OffsetCommitRequestPartition;
+import 
org.apache.kafka.common.message.OffsetCommitRequestData.OffsetCommitRequestTopic;
+import 
org.apache.kafka.common.message.OffsetCommitResponseData.OffsetCommitResponsePartition;
+import 
org.apache.kafka.common.message.OffsetCommitResponseData.OffsetCommitResponseTopic;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.OffsetCommitRequest;
+import org.apache.kafka.common.requests.OffsetCommitResponse;
+import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+public class AlterConsumerGroupOffsetsHandler implements 
AdminApiHandler<CoordinatorKey, Map<TopicPartition, Errors>> {
+
+    private final CoordinatorKey groupId;
+    private final Map<TopicPartition, OffsetAndMetadata> offsets;
+    private final Logger log;
+    private final AdminApiLookupStrategy<CoordinatorKey> lookupStrategy;
+
+    public AlterConsumerGroupOffsetsHandler(
+        String groupId,
+        Map<TopicPartition, OffsetAndMetadata> offsets,
+        LogContext logContext
+    ) {
+        this.groupId = CoordinatorKey.byGroupId(groupId);
+        this.offsets = offsets;
+        this.log = logContext.logger(AlterConsumerGroupOffsetsHandler.class);
+        this.lookupStrategy = new CoordinatorStrategy(CoordinatorType.GROUP, 
logContext);
+    }
+
+    @Override
+    public String apiName() {
+        return "offsetCommit";
+    }
+
+    @Override
+    public AdminApiLookupStrategy<CoordinatorKey> lookupStrategy() {
+        return lookupStrategy;
+    }
+
+    public static AdminApiFuture.SimpleAdminApiFuture<CoordinatorKey, 
Map<TopicPartition, Errors>> newFuture(
+            String groupId
+    ) {
+        return 
AdminApiFuture.forKeys(Collections.singleton(CoordinatorKey.byGroupId(groupId)));
+    }
+
+    @Override
+    public OffsetCommitRequest.Builder buildRequest(int brokerId, 
Set<CoordinatorKey> keys) {

Review comment:
       brokerId -> coordinatorId, to emphasize the point that we've found the 
coordinators at this point?

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin.internals;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.OffsetCommitRequestData;
+import 
org.apache.kafka.common.message.OffsetCommitRequestData.OffsetCommitRequestPartition;
+import 
org.apache.kafka.common.message.OffsetCommitRequestData.OffsetCommitRequestTopic;
+import 
org.apache.kafka.common.message.OffsetCommitResponseData.OffsetCommitResponsePartition;
+import 
org.apache.kafka.common.message.OffsetCommitResponseData.OffsetCommitResponseTopic;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.OffsetCommitRequest;
+import org.apache.kafka.common.requests.OffsetCommitResponse;
+import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+public class AlterConsumerGroupOffsetsHandler implements 
AdminApiHandler<CoordinatorKey, Map<TopicPartition, Errors>> {

Review comment:
       Perhaps it's worth a line of doc to explain: "Sends the 
OffsetCommitRequest to each group coordinator based on the discovered 
coordinators for the relevant groups, and handles the results."

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/DeleteConsumerGroupOffsetsResult.java
##########
@@ -32,11 +32,11 @@
  */
 @InterfaceStability.Evolving
 public class DeleteConsumerGroupOffsetsResult {
-    private final KafkaFuture<Map<TopicPartition, Errors>> future;
+    private final KafkaFutureImpl<Map<TopicPartition, Errors>> future;
     private final Set<TopicPartition> partitions;
 
 
-    DeleteConsumerGroupOffsetsResult(KafkaFuture<Map<TopicPartition, Errors>> 
future, Set<TopicPartition> partitions) {
+    DeleteConsumerGroupOffsetsResult(KafkaFutureImpl<Map<TopicPartition, 
Errors>> future, Set<TopicPartition> partitions) {

Review comment:
       I don't think these changes are needed now. 

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandler.java
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin.internals;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.OffsetDeleteRequestData;
+import 
org.apache.kafka.common.message.OffsetDeleteRequestData.OffsetDeleteRequestPartition;
+import 
org.apache.kafka.common.message.OffsetDeleteRequestData.OffsetDeleteRequestTopic;
+import 
org.apache.kafka.common.message.OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.OffsetDeleteRequest;
+import org.apache.kafka.common.requests.OffsetDeleteResponse;
+import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+public class DeleteConsumerGroupOffsetsHandler implements 
AdminApiHandler<CoordinatorKey, Map<TopicPartition, Errors>> {

Review comment:
       To save me repeating what I said before about a line of Javadoc and 
param name -> coordinator, on this and the other AdminApiHandler subclasses?

##########
File path: clients/src/test/java/org/apache/kafka/clients/MockClient.java
##########
@@ -245,10 +246,17 @@ public void send(ClientRequest request, long now) {
                 unsupportedVersionException = new UnsupportedVersionException(
                         "Api " + request.apiKey() + " with version " + 
version);
             } else {
-                AbstractRequest abstractRequest = 
request.requestBuilder().build(version);
-                if (!futureResp.requestMatcher.matches(abstractRequest))
-                    throw new IllegalStateException("Request matcher did not 
match next-in-line request "
-                            + abstractRequest + " with prepared response " + 
futureResp.responseBody);
+                try {
+                    AbstractRequest abstractRequest = 
request.requestBuilder().build(version);
+                    if (!futureResp.requestMatcher.matches(abstractRequest))
+                        continue;

Review comment:
       Is it right that we've lost the ISE entirely here?

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandler.java
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin.internals;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.OffsetDeleteRequestData;
+import 
org.apache.kafka.common.message.OffsetDeleteRequestData.OffsetDeleteRequestPartition;
+import 
org.apache.kafka.common.message.OffsetDeleteRequestData.OffsetDeleteRequestTopic;
+import 
org.apache.kafka.common.message.OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.OffsetDeleteRequest;
+import org.apache.kafka.common.requests.OffsetDeleteResponse;
+import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+public class DeleteConsumerGroupOffsetsHandler implements 
AdminApiHandler<CoordinatorKey, Map<TopicPartition, Errors>> {
+
+    private final CoordinatorKey groupId;
+    private final Set<TopicPartition> partitions;
+    private final Logger log;
+    private final AdminApiLookupStrategy<CoordinatorKey> lookupStrategy;
+
+    public DeleteConsumerGroupOffsetsHandler(
+        String groupId,
+        Set<TopicPartition> partitions,
+        LogContext logContext
+    ) {
+        this.groupId = CoordinatorKey.byGroupId(groupId);
+        this.partitions = partitions;
+        this.log = logContext.logger(DeleteConsumerGroupOffsetsHandler.class);
+        this.lookupStrategy = new CoordinatorStrategy(CoordinatorType.GROUP, 
logContext);
+    }
+
+    @Override
+    public String apiName() {
+        return "offsetDelete";
+    }
+
+    @Override
+    public AdminApiLookupStrategy<CoordinatorKey> lookupStrategy() {
+        return lookupStrategy;
+    }
+
+    public static AdminApiFuture.SimpleAdminApiFuture<CoordinatorKey, 
Map<TopicPartition, Errors>> newFuture(
+            String groupId
+    ) {
+        return 
AdminApiFuture.forKeys(Collections.singleton(CoordinatorKey.byGroupId(groupId)));
+    }
+
+    @Override
+    public OffsetDeleteRequest.Builder buildRequest(int brokerId, 
Set<CoordinatorKey> keys) {
+        final OffsetDeleteRequestTopicCollection topics = new 
OffsetDeleteRequestTopicCollection();
+        
partitions.stream().collect(Collectors.groupingBy(TopicPartition::topic)).forEach((topic,
 topicPartitions) -> topics.add(
+            new OffsetDeleteRequestTopic()
+            .setName(topic)
+            .setPartitions(topicPartitions.stream()
+                .map(tp -> new 
OffsetDeleteRequestPartition().setPartitionIndex(tp.partition()))
+                .collect(Collectors.toList())
+            )
+        ));
+
+        return new OffsetDeleteRequest.Builder(
+            new OffsetDeleteRequestData()
+                .setGroupId(groupId.idValue)
+                .setTopics(topics)
+        );
+    }
+
+    @Override
+    public ApiResult<CoordinatorKey, Map<TopicPartition, Errors>> 
handleResponse(Node broker, Set<CoordinatorKey> groupIds,
+            AbstractResponse abstractResponse) {
+
+        final OffsetDeleteResponse response = (OffsetDeleteResponse) 
abstractResponse;
+        Map<CoordinatorKey, Map<TopicPartition, Errors>> completed = new 
HashMap<>();
+        Map<CoordinatorKey, Throwable> failed = new HashMap<>();
+        List<CoordinatorKey> unmapped = new ArrayList<>();
+
+        final Errors error = Errors.forCode(response.data().errorCode());
+        if (error != Errors.NONE) {
+            handleError(groupId, error, failed, unmapped);
+        } else {
+            final Map<TopicPartition, Errors> partitions = new HashMap<>();
+            response.data().topics().forEach(topic -> 
+                topic.partitions().forEach(partition -> {
+                    Errors partitionError = 
Errors.forCode(partition.errorCode());
+                    boolean hasError = handleError(groupId, partitionError, 
failed, unmapped);

Review comment:
       inline?

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin.internals;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.OffsetCommitRequestData;
+import 
org.apache.kafka.common.message.OffsetCommitRequestData.OffsetCommitRequestPartition;
+import 
org.apache.kafka.common.message.OffsetCommitRequestData.OffsetCommitRequestTopic;
+import 
org.apache.kafka.common.message.OffsetCommitResponseData.OffsetCommitResponsePartition;
+import 
org.apache.kafka.common.message.OffsetCommitResponseData.OffsetCommitResponseTopic;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.OffsetCommitRequest;
+import org.apache.kafka.common.requests.OffsetCommitResponse;
+import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+public class AlterConsumerGroupOffsetsHandler implements 
AdminApiHandler<CoordinatorKey, Map<TopicPartition, Errors>> {
+
+    private final CoordinatorKey groupId;
+    private final Map<TopicPartition, OffsetAndMetadata> offsets;
+    private final Logger log;
+    private final AdminApiLookupStrategy<CoordinatorKey> lookupStrategy;
+
+    public AlterConsumerGroupOffsetsHandler(
+        String groupId,
+        Map<TopicPartition, OffsetAndMetadata> offsets,
+        LogContext logContext
+    ) {
+        this.groupId = CoordinatorKey.byGroupId(groupId);
+        this.offsets = offsets;
+        this.log = logContext.logger(AlterConsumerGroupOffsetsHandler.class);
+        this.lookupStrategy = new CoordinatorStrategy(CoordinatorType.GROUP, 
logContext);
+    }
+
+    @Override
+    public String apiName() {
+        return "offsetCommit";
+    }
+
+    @Override
+    public AdminApiLookupStrategy<CoordinatorKey> lookupStrategy() {
+        return lookupStrategy;
+    }
+
+    public static AdminApiFuture.SimpleAdminApiFuture<CoordinatorKey, 
Map<TopicPartition, Errors>> newFuture(
+            String groupId
+    ) {
+        return 
AdminApiFuture.forKeys(Collections.singleton(CoordinatorKey.byGroupId(groupId)));
+    }
+
+    @Override
+    public OffsetCommitRequest.Builder buildRequest(int brokerId, 
Set<CoordinatorKey> keys) {
+        List<OffsetCommitRequestTopic> topics = new ArrayList<>();
+        Map<String, List<OffsetCommitRequestPartition>> offsetData = new 
HashMap<>();
+        for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : 
offsets.entrySet()) {
+            String topic = entry.getKey().topic();
+            OffsetAndMetadata oam = entry.getValue();
+            offsetData.compute(topic, (key, value) -> {
+                if (value == null) {
+                    value = new ArrayList<>();
+                }
+                OffsetCommitRequestPartition partition = new 
OffsetCommitRequestPartition()
+                        .setCommittedOffset(oam.offset())
+                        .setCommittedLeaderEpoch(oam.leaderEpoch().orElse(-1))
+                        .setCommittedMetadata(oam.metadata())
+                        .setPartitionIndex(entry.getKey().partition());
+                value.add(partition);
+                return value;
+            });

Review comment:
       Wouldn't this be clearer as a computeIfAbsent()?
   ```suggestion
               OffsetCommitRequestPartition partition = new 
OffsetCommitRequestPartition()
                       .setCommittedOffset(oam.offset())
                       .setCommittedLeaderEpoch(oam.leaderEpoch().orElse(-1))
                       .setCommittedMetadata(oam.metadata())
                       .setPartitionIndex(entry.getKey().partition());
               offsetData.computeIfAbsent(topic, key -> new 
ArrayList<>()).add(partition);
   ```

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin.internals;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.OffsetCommitRequestData;
+import 
org.apache.kafka.common.message.OffsetCommitRequestData.OffsetCommitRequestPartition;
+import 
org.apache.kafka.common.message.OffsetCommitRequestData.OffsetCommitRequestTopic;
+import 
org.apache.kafka.common.message.OffsetCommitResponseData.OffsetCommitResponsePartition;
+import 
org.apache.kafka.common.message.OffsetCommitResponseData.OffsetCommitResponseTopic;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.OffsetCommitRequest;
+import org.apache.kafka.common.requests.OffsetCommitResponse;
+import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+public class AlterConsumerGroupOffsetsHandler implements 
AdminApiHandler<CoordinatorKey, Map<TopicPartition, Errors>> {
+
+    private final CoordinatorKey groupId;
+    private final Map<TopicPartition, OffsetAndMetadata> offsets;
+    private final Logger log;
+    private final AdminApiLookupStrategy<CoordinatorKey> lookupStrategy;
+
+    public AlterConsumerGroupOffsetsHandler(
+        String groupId,
+        Map<TopicPartition, OffsetAndMetadata> offsets,
+        LogContext logContext
+    ) {
+        this.groupId = CoordinatorKey.byGroupId(groupId);
+        this.offsets = offsets;
+        this.log = logContext.logger(AlterConsumerGroupOffsetsHandler.class);
+        this.lookupStrategy = new CoordinatorStrategy(CoordinatorType.GROUP, 
logContext);
+    }
+
+    @Override
+    public String apiName() {
+        return "offsetCommit";
+    }
+
+    @Override
+    public AdminApiLookupStrategy<CoordinatorKey> lookupStrategy() {
+        return lookupStrategy;
+    }
+
+    public static AdminApiFuture.SimpleAdminApiFuture<CoordinatorKey, 
Map<TopicPartition, Errors>> newFuture(
+            String groupId
+    ) {
+        return 
AdminApiFuture.forKeys(Collections.singleton(CoordinatorKey.byGroupId(groupId)));
+    }
+
+    @Override
+    public OffsetCommitRequest.Builder buildRequest(int brokerId, 
Set<CoordinatorKey> keys) {
+        List<OffsetCommitRequestTopic> topics = new ArrayList<>();
+        Map<String, List<OffsetCommitRequestPartition>> offsetData = new 
HashMap<>();
+        for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : 
offsets.entrySet()) {
+            String topic = entry.getKey().topic();
+            OffsetAndMetadata oam = entry.getValue();
+            offsetData.compute(topic, (key, value) -> {
+                if (value == null) {
+                    value = new ArrayList<>();
+                }
+                OffsetCommitRequestPartition partition = new 
OffsetCommitRequestPartition()
+                        .setCommittedOffset(oam.offset())
+                        .setCommittedLeaderEpoch(oam.leaderEpoch().orElse(-1))
+                        .setCommittedMetadata(oam.metadata())
+                        .setPartitionIndex(entry.getKey().partition());
+                value.add(partition);
+                return value;
+            });
+        }
+        for (Map.Entry<String, List<OffsetCommitRequestPartition>> entry : 
offsetData.entrySet()) {
+            OffsetCommitRequestTopic topic = new OffsetCommitRequestTopic()
+                    .setName(entry.getKey())
+                    .setPartitions(entry.getValue());
+            topics.add(topic);
+        }
+        OffsetCommitRequestData data = new OffsetCommitRequestData()
+            .setGroupId(groupId.idValue)
+            .setTopics(topics);
+        return new OffsetCommitRequest.Builder(data);
+    }
+
+    @Override
+    public ApiResult<CoordinatorKey, Map<TopicPartition, Errors>> 
handleResponse(Node broker, Set<CoordinatorKey> groupIds,

Review comment:
       Again, wdyt of `broker` -> `coordinator`?

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
##########
@@ -1524,11 +1534,29 @@ String coordinatorKey() {
         @Override
         public void handleResponse(AbstractResponse response) {
             FindCoordinatorResponse findCoordinatorResponse = 
(FindCoordinatorResponse) response;
-            Errors error = findCoordinatorResponse.error();
             CoordinatorType coordinatorType = 
CoordinatorType.forId(builder.data().keyType());
 
+            if (batchFindCoordinator && 
findCoordinatorResponse.data().coordinators().size() != 1) {
+                log.error("Group coordinator lookup failed: Invalid response 
containing more than a single coordinator");
+                fatalError(new IllegalStateException("Group coordinator lookup 
failed: Invalid response containing more than a single coordinator"));
+            }
+            String key = batchFindCoordinator
+                    ? 
findCoordinatorResponse.data().coordinators().get(0).key()

Review comment:
       Similarly a var for 
`findCoordinatorResponse.data().coordinators().get(0)`

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -813,34 +815,56 @@ public void handle(SyncGroupResponse syncResponse,
      */
     private RequestFuture<Void> sendFindCoordinatorRequest(Node node) {
         // initiate the group metadata request
-        log.debug("Sending FindCoordinator request to broker {}", node);
-        FindCoordinatorRequest.Builder requestBuilder =
-                new FindCoordinatorRequest.Builder(
-                        new FindCoordinatorRequestData()
-                            .setKeyType(CoordinatorType.GROUP.id())
-                            .setKey(this.rebalanceConfig.groupId));
+        log.debug("Sending FindCoordinator request to broker {} with 
batch={}", node, batchFindCoordinator);
+        FindCoordinatorRequestData data = new FindCoordinatorRequestData()
+                .setKeyType(CoordinatorType.GROUP.id());
+        if (batchFindCoordinator) {
+            
data.setCoordinatorKeys(Collections.singletonList(this.rebalanceConfig.groupId));
+        } else {
+            data.setKey(this.rebalanceConfig.groupId);
+        }
+        FindCoordinatorRequest.Builder requestBuilder = new 
FindCoordinatorRequest.Builder(data);
         return client.send(node, requestBuilder)
-                .compose(new FindCoordinatorResponseHandler());
+                .compose(new 
FindCoordinatorResponseHandler(batchFindCoordinator));
     }
 
     private class FindCoordinatorResponseHandler extends 
RequestFutureAdapter<ClientResponse, Void> {
+        private boolean batch;
+        FindCoordinatorResponseHandler(boolean batch) {
+            this.batch = batch;
+        }
 
         @Override
         public void onSuccess(ClientResponse resp, RequestFuture<Void> future) 
{
             log.debug("Received FindCoordinator response {}", resp);
 
             FindCoordinatorResponse findCoordinatorResponse = 
(FindCoordinatorResponse) resp.responseBody();
-            Errors error = findCoordinatorResponse.error();
+            if (batch && findCoordinatorResponse.data().coordinators().size() 
!= 1) {
+                log.error("Group coordinator lookup failed: Invalid response 
containing more than a single coordinator");
+                future.raise(new IllegalStateException("Group coordinator 
lookup failed: Invalid response containing more than a single coordinator"));
+            }
+            Errors error = batch
+                    ? 
Errors.forCode(findCoordinatorResponse.data().coordinators().get(0).errorCode())
+                    : findCoordinatorResponse.error();
             if (error == Errors.NONE) {
                 synchronized (AbstractCoordinator.this) {
+                    int nodeId = batch
+                            ? 
findCoordinatorResponse.data().coordinators().get(0).nodeId()

Review comment:
       Factor out var for 
`findCoordinatorResponse.data().coordinators().get(0)`?

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiHandler.java
##########
@@ -57,13 +58,13 @@
      * Note that keys which received a retriable error should be left out of 
the
      * result. They will be retried automatically.
      *
-     * @param brokerId the brokerId that the associated request was sent to
+     * @param broker the broker that the associated request was sent to

Review comment:
       Is it necessarily a broker, or could it be a kraft controller?

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupsHandler.java
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin.internals;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.message.DeleteGroupsRequestData;
+import 
org.apache.kafka.common.message.DeleteGroupsResponseData.DeletableGroupResult;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.DeleteGroupsRequest;
+import org.apache.kafka.common.requests.DeleteGroupsResponse;
+import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+public class DeleteConsumerGroupsHandler implements 
AdminApiHandler<CoordinatorKey, Void> {
+
+    private final Logger log;
+    private final AdminApiLookupStrategy<CoordinatorKey> lookupStrategy;
+
+    public DeleteConsumerGroupsHandler(
+        Set<String> groupIds,
+        LogContext logContext
+    ) {
+        this.log = logContext.logger(DeleteConsumerGroupsHandler.class);
+        this.lookupStrategy = new CoordinatorStrategy(CoordinatorType.GROUP, 
logContext);
+    }
+
+    @Override
+    public String apiName() {
+        return "deleteConsumerGroups";
+    }
+
+    @Override
+    public AdminApiLookupStrategy<CoordinatorKey> lookupStrategy() {
+        return lookupStrategy;
+    }
+
+    public static AdminApiFuture.SimpleAdminApiFuture<CoordinatorKey, Void> 
newFuture(
+            Collection<String> groupIds
+    ) {
+        return AdminApiFuture.forKeys(buildKeySet(groupIds));
+    }
+
+    private static Set<CoordinatorKey> buildKeySet(Collection<String> 
groupIds) {
+        return groupIds.stream()
+            .map(CoordinatorKey::byGroupId)
+            .collect(Collectors.toSet());
+    }
+
+    @Override
+    public DeleteGroupsRequest.Builder buildRequest(int brokerId, 
Set<CoordinatorKey> keys) {
+        List<String> groupIds = keys.stream().map(key -> 
key.idValue).collect(Collectors.toList());
+        DeleteGroupsRequestData data = new DeleteGroupsRequestData()
+                .setGroupsNames(new ArrayList<>(groupIds));

Review comment:
       Is the copy needed?

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -813,34 +815,56 @@ public void handle(SyncGroupResponse syncResponse,
      */
     private RequestFuture<Void> sendFindCoordinatorRequest(Node node) {
         // initiate the group metadata request
-        log.debug("Sending FindCoordinator request to broker {}", node);
-        FindCoordinatorRequest.Builder requestBuilder =
-                new FindCoordinatorRequest.Builder(
-                        new FindCoordinatorRequestData()
-                            .setKeyType(CoordinatorType.GROUP.id())
-                            .setKey(this.rebalanceConfig.groupId));
+        log.debug("Sending FindCoordinator request to broker {} with 
batch={}", node, batchFindCoordinator);
+        FindCoordinatorRequestData data = new FindCoordinatorRequestData()
+                .setKeyType(CoordinatorType.GROUP.id());
+        if (batchFindCoordinator) {
+            
data.setCoordinatorKeys(Collections.singletonList(this.rebalanceConfig.groupId));
+        } else {
+            data.setKey(this.rebalanceConfig.groupId);
+        }
+        FindCoordinatorRequest.Builder requestBuilder = new 
FindCoordinatorRequest.Builder(data);
         return client.send(node, requestBuilder)
-                .compose(new FindCoordinatorResponseHandler());
+                .compose(new 
FindCoordinatorResponseHandler(batchFindCoordinator));
     }
 
     private class FindCoordinatorResponseHandler extends 
RequestFutureAdapter<ClientResponse, Void> {
+        private boolean batch;
+        FindCoordinatorResponseHandler(boolean batch) {
+            this.batch = batch;
+        }
 
         @Override
         public void onSuccess(ClientResponse resp, RequestFuture<Void> future) 
{
             log.debug("Received FindCoordinator response {}", resp);
 
             FindCoordinatorResponse findCoordinatorResponse = 
(FindCoordinatorResponse) resp.responseBody();
-            Errors error = findCoordinatorResponse.error();
+            if (batch && findCoordinatorResponse.data().coordinators().size() 
!= 1) {
+                log.error("Group coordinator lookup failed: Invalid response 
containing more than a single coordinator");
+                future.raise(new IllegalStateException("Group coordinator 
lookup failed: Invalid response containing more than a single coordinator"));
+            }
+            Errors error = batch
+                    ? 
Errors.forCode(findCoordinatorResponse.data().coordinators().get(0).errorCode())
+                    : findCoordinatorResponse.error();
             if (error == Errors.NONE) {
                 synchronized (AbstractCoordinator.this) {
+                    int nodeId = batch
+                            ? 
findCoordinatorResponse.data().coordinators().get(0).nodeId()

Review comment:
       Sorriy, I think I made an erroneous point like this before. It sucks 
that there's no good abstraction across the common parts of 
`FindCoordinatorResponseData.Coordinator` and `FindCoordinatorResponseData`. 

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiHandler.java
##########
@@ -57,13 +58,13 @@
      * Note that keys which received a retriable error should be left out of 
the
      * result. They will be retried automatically.
      *
-     * @param brokerId the brokerId that the associated request was sent to
+     * @param broker the broker that the associated request was sent to

Review comment:
       Well `process.roles` is documented like this in `config/kraft/README.md`:
   
   * If `process.roles` is set to `broker`, the server acts as a broker in 
KRaft mode.
   * If `process.roles` is set to `controller`, the server acts as a controller 
in KRaft mode.
   * If `process.roles` is set to `broker,controller`, the server acts as both 
a broker and a controller in KRaft mode.
   
   which suggests to me that we're using the term "broker"  to mean "a thing 
which handles Produce and Fetch etc". (However, "controller" is a bit confusing 
there, since while we might have several servers in the "controller" role only 
one will be  _the_ controller at any one time. I'm not aware of a good term for 
"server that is participating in the raft cluster, but might not be the current 
controller right now").




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to