jsancio commented on code in PR #15986:
URL: https://github.com/apache/kafka/pull/15986#discussion_r1614958193


##########
raft/src/main/java/org/apache/kafka/raft/RequestManager.java:
##########
@@ -17,108 +17,196 @@
 package org.apache.kafka.raft;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
-import java.util.OptionalInt;
+import java.util.Optional;
 import java.util.OptionalLong;
 import java.util.Random;
-import java.util.Set;
+import org.apache.kafka.common.Node;
 
 public class RequestManager {
-    private final Map<Integer, ConnectionState> connections = new HashMap<>();
-    private final List<Integer> voters = new ArrayList<>();
+    private final Map<String, ConnectionState> connections = new HashMap<>();
+    private final ArrayList<Node> bootstrapServers;
 
     private final int retryBackoffMs;
     private final int requestTimeoutMs;
     private final Random random;
 
-    public RequestManager(Set<Integer> voterIds,
-                          int retryBackoffMs,
-                          int requestTimeoutMs,
-                          Random random) {
-
+    public RequestManager(
+        Collection<Node> bootstrapServers,
+        int retryBackoffMs,
+        int requestTimeoutMs,
+        Random random
+    ) {
+        this.bootstrapServers = new ArrayList<>(bootstrapServers);
         this.retryBackoffMs = retryBackoffMs;
         this.requestTimeoutMs = requestTimeoutMs;
-        this.voters.addAll(voterIds);
         this.random = random;
-
-        for (Integer voterId: voterIds) {
-            ConnectionState connection = new ConnectionState(voterId);
-            connections.put(voterId, connection);
-        }
-    }
-
-    public ConnectionState getOrCreate(int id) {
-        return connections.computeIfAbsent(id, key -> new ConnectionState(id));
     }
 
-    public OptionalInt findReadyVoter(long currentTimeMs) {
-        int startIndex = random.nextInt(voters.size());
-        OptionalInt res = OptionalInt.empty();
-        for (int i = 0; i < voters.size(); i++) {
-            int index = (startIndex + i) % voters.size();
-            Integer voterId = voters.get(index);
-            ConnectionState connection = connections.get(voterId);
-            boolean isReady = connection.isReady(currentTimeMs);
+    public Optional<Node> findReadyBootstrapServer(long currentTimeMs) {
+        int startIndex = random.nextInt(bootstrapServers.size());
+        Optional<Node> res = Optional.empty();
+        for (int i = 0; i < bootstrapServers.size(); i++) {
+            int index = (startIndex + i) % bootstrapServers.size();
+            Node node = bootstrapServers.get(index);
 
-            if (isReady) {
-                res = OptionalInt.of(voterId);
-            } else if (connection.inFlightCorrelationId.isPresent()) {
-                res = OptionalInt.empty();
+            if (isReady(node, currentTimeMs)) {
+                res = Optional.of(node);
+            } else if (hasInflightRequest(node, currentTimeMs)) {
+                res = Optional.empty();
                 break;
             }
         }
+
         return res;
     }
 
-    public long backoffBeforeAvailableVoter(long currentTimeMs) {
-        long minBackoffMs = Long.MAX_VALUE;
-        for (Integer voterId : voters) {
-            ConnectionState connection = connections.get(voterId);
-            if (connection.isReady(currentTimeMs)) {
+    public long backoffBeforeAvailableBootstrapServer(long currentTimeMs) {
+        long minBackoffMs = Math.max(retryBackoffMs, requestTimeoutMs);
+        for (Node node : bootstrapServers) {
+            if (isReady(node, currentTimeMs)) {
                 return 0L;
-            } else if (connection.isBackingOff(currentTimeMs)) {
-                minBackoffMs = Math.min(minBackoffMs, 
connection.remainingBackoffMs(currentTimeMs));
+            } else if (isBackingOff(node, currentTimeMs)) {
+                minBackoffMs = Math.min(minBackoffMs, remainingBackoffMs(node, 
currentTimeMs));
             } else {
-                minBackoffMs = Math.min(minBackoffMs, 
connection.remainingRequestTimeMs(currentTimeMs));
+                minBackoffMs = Math.min(minBackoffMs, 
remainingRequestTimeMs(node, currentTimeMs));
             }
         }
+
         return minBackoffMs;
     }
 
+    public boolean hasRequestTimedOut(Node node, long timeMs) {
+        ConnectionState state = connections.get(node.idString());
+        if (state == null) {
+            return false;
+        }
+
+        return state.hasRequestTimedOut(timeMs);
+    }
+
+    public boolean isReady(Node node, long timeMs) {
+        ConnectionState state = connections.get(node.idString());
+        if (state == null) {
+            return true;
+        }
+
+        boolean ready = state.isReady(timeMs);
+        if (ready) {
+            reset(node);
+        }
+
+        return ready;
+    }
+
+    public boolean isBackingOff(Node node, long timeMs) {
+        ConnectionState state = connections.get(node.idString());
+        if (state == null) {
+            return false;
+        }
+
+        return state.isBackingOff(timeMs);
+    }
+
+    public long remainingRequestTimeMs(Node node, long timeMs) {
+        ConnectionState state = connections.get(node.idString());
+        if (state == null) {
+            return 0;
+        }
+
+        return state.remainingRequestTimeMs(timeMs);
+    }
+
+    public long remainingBackoffMs(Node node, long timeMs) {
+        ConnectionState state = connections.get(node.idString());
+        if (state == null) {
+            return 0;
+        }
+
+        return  state.remainingBackoffMs(timeMs);
+    }
+
+    public boolean isResponseExpected(Node node, long correlationId) {
+        ConnectionState state = connections.get(node.idString());
+        if (state == null) {
+            return false;
+        }
+
+        return state.isResponseExpected(correlationId);
+    }
+
+    public void onResponseReceived(Node node, long correlationId) {

Review Comment:
   Sounds good. Went with `onResponseResult(Node, long, boolean, long)`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to