tombentley commented on a change in pull request #10743:
URL: https://github.com/apache/kafka/pull/10743#discussion_r646585310



##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/DeleteConsumerGroupsResult.java
##########
@@ -40,7 +43,11 @@
      * individual deletions.
      */
     public Map<String, KafkaFuture<Void>> deletedGroups() {
-        return futures;
+        Map<String, KafkaFuture<Void>> deletedGroups = new HashMap<>();

Review comment:
       ```suggestion
           Map<String, KafkaFuture<Void>> deletedGroups = new 
HashMap<>(this.futures.size());
   ```

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/CoordinatorKey.java
##########
@@ -24,7 +24,7 @@
     public final String idValue;
     public final FindCoordinatorRequest.CoordinatorType type;
 
-    private CoordinatorKey(String idValue, 
FindCoordinatorRequest.CoordinatorType type) {
+    public CoordinatorKey(String idValue, 
FindCoordinatorRequest.CoordinatorType type) {

Review comment:
       To me it would be more natural to specify the `type` first then the 
`idValue`.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java
##########
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin.internals;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.kafka.clients.admin.ConsumerGroupDescription;
+import org.apache.kafka.clients.admin.MemberAssignment;
+import org.apache.kafka.clients.admin.MemberDescription;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Assignment;
+import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
+import org.apache.kafka.common.ConsumerGroupState;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.acl.AclOperation;
+import org.apache.kafka.common.message.DescribeGroupsRequestData;
+import 
org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroup;
+import 
org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroupMember;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.DescribeGroupsRequest;
+import org.apache.kafka.common.requests.DescribeGroupsResponse;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+
+public class DescribeConsumerGroupsHandler implements 
AdminApiHandler<CoordinatorKey, ConsumerGroupDescription> {
+
+    private final boolean includeAuthorizedOperations;
+    private final Logger log;
+    private final AdminApiLookupStrategy<CoordinatorKey> lookupStrategy;
+
+    public DescribeConsumerGroupsHandler(
+        Set<String> groupIds,

Review comment:
       Unused

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiDriver.java
##########
@@ -267,7 +278,11 @@ public void onFailure(
     private void clearInflightRequest(long currentTimeMs, RequestSpec<K> spec) 
{
         RequestState requestState = requestStates.get(spec.scope);
         if (requestState != null) {
-            requestState.clearInflight(currentTimeMs);
+            if (spec.scope instanceof FulfillmentScope) {
+                requestState.clearInflight(currentTimeMs + retryBackoffMs);
+            } else {
+                requestState.clearInflight(currentTimeMs);

Review comment:
       Why the differing behaviour depending on the scope? There's no need to 
back off on find coordinator requests?

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiHandler.java
##########
@@ -63,7 +64,7 @@
      *
      * @return result indicating key completion, failure, and unmapping
      */
-    ApiResult<K, V> handleResponse(int brokerId, Set<K> keys, AbstractResponse 
response);
+    ApiResult<K, V> handleResponse(int brokerId, Set<K> keys, AbstractResponse 
response, Node node);

Review comment:
       Needs `@param` javadoc tag

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiDriver.java
##########
@@ -204,7 +206,8 @@ private void completeLookup(Map<K, Integer> 
brokerIdMapping) {
     public void onResponse(
         long currentTimeMs,
         RequestSpec<K> spec,
-        AbstractResponse response
+        AbstractResponse response,
+        Node node

Review comment:
       Need to fix Javadoc for `#poll()`m which now has a broken link to this 
method.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/CoordinatorStrategy.java
##########
@@ -40,61 +54,116 @@ public CoordinatorStrategy(
 
     @Override
     public ApiRequestScope lookupScope(CoordinatorKey key) {
-        // The `FindCoordinator` API does not support batched lookups, so we 
use a
-        // separate lookup context for each coordinator key we need to lookup
-        return new LookupRequestScope(key);
+        if (batch) {
+            if (key.type == CoordinatorType.GROUP) {
+                return GROUP_REQUEST_SCOPE;
+            } else {
+                return TXN_REQUEST_SCOPE;
+            }
+        } else {
+            // If the `FindCoordinator` API does not support batched lookups, 
we use a
+            // separate lookup context for each coordinator key we need to 
lookup
+            return new LookupRequestScope(key);
+        }
     }
 
     @Override
     public FindCoordinatorRequest.Builder buildRequest(Set<CoordinatorKey> 
keys) {
-        CoordinatorKey key = requireSingleton(keys);
-        return new FindCoordinatorRequest.Builder(
-            new FindCoordinatorRequestData()
-                .setKey(key.idValue)
-                .setKeyType(key.type.id())
-        );
+        unrepresentableKeys = keys.stream().filter(k -> 
!isRepresentableKey(k.idValue)).collect(Collectors.toSet());
+        keys = keys.stream().filter(k -> 
isRepresentableKey(k.idValue)).collect(Collectors.toSet());
+        if (batch) {
+            keys = requireSameType(keys);
+            type = keys.iterator().next().type;

Review comment:
       We can be sure that keys won't be empty, right?

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1305,66 +1305,102 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 
   def handleFindCoordinatorRequest(request: RequestChannel.Request): Unit = {
+    val version = request.header.apiVersion
+    if (version < 4) {
+      handleFindCoordinatorRequestLessThanV4(request)
+    } else {
+      handleFindCoordinatorRequestV4AndAbove(request)

Review comment:
       Do we really need these separate methods? I would have thought we could 
just wrap the `findCoordinatorRequest.data.key` in a singleton List for the 
`version < 4` and otherwise they'd be handled the same. If there is a reason I 
think it deserves a comment.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiDriver.java
##########
@@ -250,6 +254,13 @@ public void onFailure(
                 .filter(future.lookupKeys()::contains)
                 .collect(Collectors.toSet());
             retryLookup(keysToUnmap);
+
+        } else if (t instanceof UnsupportedBatchLookupException) {
+            ((CoordinatorStrategy) handler.lookupStrategy()).disableBatch();

Review comment:
       The need for this typecast (i.e. the coupling between the strategy and 
the exception) makes me think that the name of 
`UnsupportedBatchLookupException` isn't ideal. It's pretty specific to the 
`FindCoordinators` API, but that's not really conveyed by its name. Maybe 
something like `NoBatchedFindCoordinatorsException`?

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -858,6 +885,12 @@ public void onSuccess(ClientResponse resp, 
RequestFuture<Void> future) {
         public void onFailure(RuntimeException e, RequestFuture<Void> future) {
             log.debug("FindCoordinator request failed due to {}", 
e.toString());
 
+            if (e instanceof UnsupportedBatchLookupException) {

Review comment:
       Is there some reason why we can't pass the `ApiVersions` into the 
coordinator, so we can get the batching right without needing to retry like 
this?

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/CoordinatorStrategy.java
##########
@@ -17,20 +17,34 @@
 package org.apache.kafka.clients.admin.internals;
 
 import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.InvalidGroupIdException;
 import org.apache.kafka.common.errors.TransactionalIdAuthorizationException;
 import org.apache.kafka.common.message.FindCoordinatorRequestData;
+import org.apache.kafka.common.message.FindCoordinatorResponseData.Coordinator;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.AbstractResponse;
 import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType;
 import org.apache.kafka.common.requests.FindCoordinatorResponse;
 import org.apache.kafka.common.utils.LogContext;
 import org.slf4j.Logger;
 
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 public class CoordinatorStrategy implements 
AdminApiLookupStrategy<CoordinatorKey> {
+
+    private static final ApiRequestScope GROUP_REQUEST_SCOPE = new 
ApiRequestScope() { };
+    private static final ApiRequestScope TXN_REQUEST_SCOPE = new 
ApiRequestScope() { };
+
     private final Logger log;
+    private boolean batch = true;
+    private FindCoordinatorRequest.CoordinatorType type;

Review comment:
       Why can't the `type` be set once in the constructor (since all the call 
sites would appear to know what type they're interested in), rather than 
needing to be mutable?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to