dajac commented on a change in pull request #10743:
URL: https://github.com/apache/kafka/pull/10743#discussion_r655911095



##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/DeleteConsumerGroupsResult.java
##########
@@ -29,9 +32,9 @@
  */
 @InterfaceStability.Evolving
 public class DeleteConsumerGroupsResult {
-    private final Map<String, KafkaFuture<Void>> futures;
+    private final Map<CoordinatorKey, KafkaFutureImpl<Void>> futures;
 
-    DeleteConsumerGroupsResult(final Map<String, KafkaFuture<Void>> futures) {
+    DeleteConsumerGroupsResult(Map<CoordinatorKey, KafkaFutureImpl<Void>> 
futures) {

Review comment:
       Do we really need `KafkaFutureImpl` here? It would be better to use 
`KafkaFuture` if we can.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/DeleteConsumerGroupsResult.java
##########
@@ -40,7 +43,11 @@
      * individual deletions.
      */
     public Map<String, KafkaFuture<Void>> deletedGroups() {
-        return futures;
+        Map<String, KafkaFuture<Void>> deletedGroups = new 
HashMap<>(futures.size());
+        for (Map.Entry<CoordinatorKey, KafkaFutureImpl<Void>> entry : 
futures.entrySet()) {
+            deletedGroups.put(entry.getKey().idValue, entry.getValue());
+        }

Review comment:
       nit: We tend to use `forEach` nowadays which makes the code a bit less 
verbose. Up to you.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/DescribeConsumerGroupsResult.java
##########
@@ -34,39 +35,40 @@
 @InterfaceStability.Evolving
 public class DescribeConsumerGroupsResult {
 
-    private final Map<String, KafkaFuture<ConsumerGroupDescription>> futures;
+    private final Map<CoordinatorKey, 
KafkaFutureImpl<ConsumerGroupDescription>> futures;
 
-    public DescribeConsumerGroupsResult(final Map<String, 
KafkaFuture<ConsumerGroupDescription>> futures) {
+    public DescribeConsumerGroupsResult(Map<CoordinatorKey, 
KafkaFutureImpl<ConsumerGroupDescription>> futures) {

Review comment:
       Same question regarding `KafkaFutureImpl`.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiDriver.java
##########
@@ -267,7 +278,11 @@ public void onFailure(
     private void clearInflightRequest(long currentTimeMs, RequestSpec<K> spec) 
{
         RequestState requestState = requestStates.get(spec.scope);
         if (requestState != null) {
-            requestState.clearInflight(currentTimeMs);
+            if (spec.scope instanceof FulfillmentScope) {
+                requestState.clearInflight(currentTimeMs + retryBackoffMs);
+            } else {
+                requestState.clearInflight(currentTimeMs);

Review comment:
       I think that this is worth a comment as it is not obvious.

##########
File path: clients/src/test/java/org/apache/kafka/clients/MockClient.java
##########
@@ -245,10 +246,18 @@ public void send(ClientRequest request, long now) {
                 unsupportedVersionException = new UnsupportedVersionException(
                         "Api " + request.apiKey() + " with version " + 
version);
             } else {
-                AbstractRequest abstractRequest = 
request.requestBuilder().build(version);
-                if (!futureResp.requestMatcher.matches(abstractRequest))
-                    throw new IllegalStateException("Request matcher did not 
match next-in-line request "
-                            + abstractRequest + " with prepared response " + 
futureResp.responseBody);
+                try {
+                    AbstractRequest abstractRequest = 
request.requestBuilder().build(version);
+                    if (!futureResp.requestMatcher.matches(abstractRequest))
+                        throw new IllegalStateException("Request matcher did 
not match next-in-line request "
+                                + abstractRequest + " with prepared response " 
+ futureResp.responseBody);
+                } catch (NoBatchedFindCoordinatorsException uble) {

Review comment:
       I wonder if we should catch `UnsupportedVersionException` here to handle 
the general case. Have you considered it? Also, the name `uble` looks weird. Is 
it intentional?

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/CoordinatorStrategy.java
##########
@@ -17,84 +17,160 @@
 package org.apache.kafka.clients.admin.internals;
 
 import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.InvalidGroupIdException;
 import org.apache.kafka.common.errors.TransactionalIdAuthorizationException;
 import org.apache.kafka.common.message.FindCoordinatorRequestData;
+import org.apache.kafka.common.message.FindCoordinatorResponseData.Coordinator;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.AbstractResponse;
 import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType;
 import org.apache.kafka.common.requests.FindCoordinatorResponse;
 import org.apache.kafka.common.utils.LogContext;
 import org.slf4j.Logger;
 
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 public class CoordinatorStrategy implements 
AdminApiLookupStrategy<CoordinatorKey> {
+
+    private static final ApiRequestScope GROUP_REQUEST_SCOPE = new 
ApiRequestScope() { };
+    private static final ApiRequestScope TXN_REQUEST_SCOPE = new 
ApiRequestScope() { };
+
     private final Logger log;
+    private final FindCoordinatorRequest.CoordinatorType type;
+    private Set<CoordinatorKey> unrepresentableKeys = Collections.emptySet();
+
+    boolean batch = true;
 
     public CoordinatorStrategy(
+        FindCoordinatorRequest.CoordinatorType type,
         LogContext logContext
     ) {
+        this.type = type;
         this.log = logContext.logger(CoordinatorStrategy.class);
     }
 
     @Override
     public ApiRequestScope lookupScope(CoordinatorKey key) {
-        // The `FindCoordinator` API does not support batched lookups, so we 
use a
-        // separate lookup context for each coordinator key we need to lookup
-        return new LookupRequestScope(key);
+        if (batch) {
+            if (type == CoordinatorType.GROUP) {
+                return GROUP_REQUEST_SCOPE;
+            } else {
+                return TXN_REQUEST_SCOPE;
+            }

Review comment:
       nit: I wonder if differentiating the two is required here. It seems that 
we don't use them anywhere else so we could use one common `REQUEST_SCOPE` for 
both cases.

##########
File path: 
clients/src/main/java/org/apache/kafka/common/errors/NoBatchedFindCoordinatorsException.java
##########
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+/**
+ * Indicates that it is not possible to lookup coordinators in batches with 
FindCoordinator. Instead
+ * coordinators must be looked up one by one.
+ */
+public class NoBatchedFindCoordinatorsException extends 
UnsupportedVersionException {

Review comment:
       If I remember correctly, this new exception was not specified in the 
KIP. Should we update it and notify the thread in the mailing list?

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -4806,7 +4275,7 @@ public ListTransactionsResult 
listTransactions(ListTransactionsOptions options)
             @Override
             void handleResponse(AbstractResponse response) {
                 long currentTimeMs = time.milliseconds();
-                driver.onResponse(currentTimeMs, spec, response);
+                driver.onResponse(currentTimeMs, spec, response, 
nodeProvider.provide());

Review comment:
       Does getting the node with `nodeProvider.provide()` work in all cases? I 
suppose that it does not guarantee returning the correct node if 
`LeastLoadedNodeProvider` is used. Am I getting this right?
   
   Looking at the code, I am not sure to understand why/where we need this. 
Could you elaborate a bit?

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/CoordinatorKey.java
##########
@@ -24,7 +24,7 @@
     public final String idValue;
     public final FindCoordinatorRequest.CoordinatorType type;
 
-    private CoordinatorKey(String idValue, 
FindCoordinatorRequest.CoordinatorType type) {
+    public CoordinatorKey(FindCoordinatorRequest.CoordinatorType type, String 
idValue) {

Review comment:
       nit: Could we keep this private? I think that the intent was to use the 
static methods below to construct it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to