hachikuji commented on code in PR #15986:
URL: https://github.com/apache/kafka/pull/15986#discussion_r1610398594


##########
core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala:
##########
@@ -118,6 +120,10 @@ class RaftManagerTest {
       new Metrics(Time.SYSTEM),
       Option.empty,
       
CompletableFuture.completedFuture(QuorumConfig.parseVoterConnections(config.quorumVoters)),
+      config.quorumBootstrapServers

Review Comment:
   nit: we seem to have this same little snippet in a few places. Is there 
somewhere we could add a helper?



##########
core/src/test/resources/log4j.properties:
##########
@@ -20,6 +20,8 @@ log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m 
(%c:%L)%n
 
 log4j.logger.kafka=WARN
 log4j.logger.org.apache.kafka=WARN
+# TODO; remove this line

Review Comment:
   Reminder about TODO



##########
raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java:
##########
@@ -231,4 +266,26 @@ public String toString() {
             return "non-empty list";
         }
     }
+
+    public static class ControllerQuorumBootstrapServersValidator implements 
ConfigDef.Validator {
+        @Override
+        public void ensureValid(String name, Object value) {
+            if (value == null) {
+                throw new ConfigException(name, null);
+            }
+
+            @SuppressWarnings("unchecked")
+            List<String> entries = (List<String>) value;
+
+            // Attempt to parse the connect strings
+            for (String entry : entries) {
+                QuorumConfig.parseBootstrapServer(entry);

Review Comment:
   nit: drop unnecessary `QuorumConfig` prefix?



##########
core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala:
##########
@@ -1366,6 +1368,26 @@ class KafkaConfigTest {
     assertEquals(expectedVoters, addresses)
   }
 
+  @Test
+  def testParseQuorumBootstrapServers(): Unit = {

Review Comment:
   Perhaps we should have some invalid test cases as well?



##########
raft/src/main/java/org/apache/kafka/raft/RequestManager.java:
##########
@@ -17,108 +17,196 @@
 package org.apache.kafka.raft;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
-import java.util.OptionalInt;
+import java.util.Optional;
 import java.util.OptionalLong;
 import java.util.Random;
-import java.util.Set;
+import org.apache.kafka.common.Node;
 
 public class RequestManager {
-    private final Map<Integer, ConnectionState> connections = new HashMap<>();
-    private final List<Integer> voters = new ArrayList<>();
+    private final Map<String, ConnectionState> connections = new HashMap<>();
+    private final ArrayList<Node> bootstrapServers;
 
     private final int retryBackoffMs;
     private final int requestTimeoutMs;
     private final Random random;
 
-    public RequestManager(Set<Integer> voterIds,
-                          int retryBackoffMs,
-                          int requestTimeoutMs,
-                          Random random) {
-
+    public RequestManager(
+        Collection<Node> bootstrapServers,
+        int retryBackoffMs,
+        int requestTimeoutMs,
+        Random random
+    ) {
+        this.bootstrapServers = new ArrayList<>(bootstrapServers);
         this.retryBackoffMs = retryBackoffMs;
         this.requestTimeoutMs = requestTimeoutMs;
-        this.voters.addAll(voterIds);
         this.random = random;
-
-        for (Integer voterId: voterIds) {
-            ConnectionState connection = new ConnectionState(voterId);
-            connections.put(voterId, connection);
-        }
-    }
-
-    public ConnectionState getOrCreate(int id) {
-        return connections.computeIfAbsent(id, key -> new ConnectionState(id));
     }
 
-    public OptionalInt findReadyVoter(long currentTimeMs) {
-        int startIndex = random.nextInt(voters.size());
-        OptionalInt res = OptionalInt.empty();
-        for (int i = 0; i < voters.size(); i++) {
-            int index = (startIndex + i) % voters.size();
-            Integer voterId = voters.get(index);
-            ConnectionState connection = connections.get(voterId);
-            boolean isReady = connection.isReady(currentTimeMs);
+    public Optional<Node> findReadyBootstrapServer(long currentTimeMs) {
+        int startIndex = random.nextInt(bootstrapServers.size());
+        Optional<Node> res = Optional.empty();
+        for (int i = 0; i < bootstrapServers.size(); i++) {
+            int index = (startIndex + i) % bootstrapServers.size();
+            Node node = bootstrapServers.get(index);
 
-            if (isReady) {
-                res = OptionalInt.of(voterId);
-            } else if (connection.inFlightCorrelationId.isPresent()) {
-                res = OptionalInt.empty();
+            if (isReady(node, currentTimeMs)) {
+                res = Optional.of(node);
+            } else if (hasInflightRequest(node, currentTimeMs)) {
+                res = Optional.empty();
                 break;
             }
         }
+
         return res;
     }
 
-    public long backoffBeforeAvailableVoter(long currentTimeMs) {
-        long minBackoffMs = Long.MAX_VALUE;
-        for (Integer voterId : voters) {
-            ConnectionState connection = connections.get(voterId);
-            if (connection.isReady(currentTimeMs)) {
+    public long backoffBeforeAvailableBootstrapServer(long currentTimeMs) {
+        long minBackoffMs = Math.max(retryBackoffMs, requestTimeoutMs);
+        for (Node node : bootstrapServers) {
+            if (isReady(node, currentTimeMs)) {
                 return 0L;
-            } else if (connection.isBackingOff(currentTimeMs)) {
-                minBackoffMs = Math.min(minBackoffMs, 
connection.remainingBackoffMs(currentTimeMs));
+            } else if (isBackingOff(node, currentTimeMs)) {
+                minBackoffMs = Math.min(minBackoffMs, remainingBackoffMs(node, 
currentTimeMs));
             } else {
-                minBackoffMs = Math.min(minBackoffMs, 
connection.remainingRequestTimeMs(currentTimeMs));
+                minBackoffMs = Math.min(minBackoffMs, 
remainingRequestTimeMs(node, currentTimeMs));
             }
         }
+
         return minBackoffMs;
     }
 
+    public boolean hasRequestTimedOut(Node node, long timeMs) {
+        ConnectionState state = connections.get(node.idString());
+        if (state == null) {
+            return false;
+        }
+
+        return state.hasRequestTimedOut(timeMs);
+    }
+
+    public boolean isReady(Node node, long timeMs) {
+        ConnectionState state = connections.get(node.idString());
+        if (state == null) {
+            return true;
+        }
+
+        boolean ready = state.isReady(timeMs);
+        if (ready) {
+            reset(node);
+        }
+
+        return ready;
+    }
+
+    public boolean isBackingOff(Node node, long timeMs) {
+        ConnectionState state = connections.get(node.idString());
+        if (state == null) {
+            return false;
+        }
+
+        return state.isBackingOff(timeMs);
+    }
+
+    public long remainingRequestTimeMs(Node node, long timeMs) {
+        ConnectionState state = connections.get(node.idString());
+        if (state == null) {
+            return 0;
+        }
+
+        return state.remainingRequestTimeMs(timeMs);
+    }
+
+    public long remainingBackoffMs(Node node, long timeMs) {
+        ConnectionState state = connections.get(node.idString());
+        if (state == null) {
+            return 0;
+        }
+
+        return  state.remainingBackoffMs(timeMs);
+    }
+
+    public boolean isResponseExpected(Node node, long correlationId) {
+        ConnectionState state = connections.get(node.idString());
+        if (state == null) {
+            return false;
+        }
+
+        return state.isResponseExpected(correlationId);
+    }
+
+    public void onResponseReceived(Node node, long correlationId) {

Review Comment:
   I wonder if we could consolidate `onResponseReceived` and `onResponseError`. 
Perhaps something like `notifyResponseResult` or something like that.



##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -393,17 +422,35 @@ public void initialize(
         logger.info("Reading KRaft snapshot and log as part of the 
initialization");
         partitionState.updateState();
 
-        VoterSet lastVoterSet = partitionState.lastVoterSet();
-        requestManager = new RequestManager(
-            lastVoterSet.voterIds(),
-            quorumConfig.retryBackoffMs(),
-            quorumConfig.requestTimeoutMs(),
-            random
-        );
+        if (requestManager == null) {
+            // The request manager wasn't created using the bootstrap servers
+            // create it using the voters static configuration
+            List<Node> bootstrapNodes = voterAddresses
+                .entrySet()
+                .stream()
+                .map(entry ->
+                    new Node(
+                        entry.getKey(),
+                        entry.getValue().getHostString(),
+                        entry.getValue().getPort()
+                    )
+                )
+                .collect(Collectors.toList());
+
+            logger.info("Starting request manager with bootstrap servers: {}", 
bootstrapNodes);

Review Comment:
   Perhaps we change this message to something like "Staring request manager 
with static voter set: {}"



##########
raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java:
##########
@@ -53,15 +56,41 @@ final public class VoterSet {
     }
 
     /**
-     * Returns the socket address for a given voter at a given listener.
+     * Returns the node information for all the given voter ids and listener.
      *
-     * @param voter the id of the voter
-     * @param listener the name of the listener
-     * @return the socket address if it exists, otherwise {@code 
Optional.empty()}
+     * @param voterIds the id of the voters

Review Comment:
   nit: ids of the voters



##########
raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java:
##########
@@ -199,6 +206,34 @@ private static Map<Integer, InetSocketAddress> 
parseVoterConnections(
         return voterMap;
     }
 
+    public static InetSocketAddress parseBootstrapServer(String 
bootstrapServer) {
+        String host = Utils.getHost(bootstrapServer);
+        if (host == null) {
+            throw new ConfigException(
+                String.format(
+                    "Failed to parse host name from {} for the configuration 
{}. Each " +
+                    "entry should be in the form \"{host}:{port}\"",
+                    bootstrapServer,
+                    QUORUM_BOOTSTRAP_SERVERS_CONFIG
+                )
+            );
+        }
+
+        Integer port = Utils.getPort(bootstrapServer);
+        if (port == null) {
+            throw new ConfigException(
+                String.format(
+                    "Failed to parse host port from {} for the configuration 
{}. Each " +
+                    "entry should be in the form \"{host}:{port}\"",
+                    bootstrapServer,
+                    QUORUM_BOOTSTRAP_SERVERS_CONFIG
+                )
+            );
+        }
+
+        return InetSocketAddress.createUnresolved(host, port);

Review Comment:
   Is this going to give us a nice exception message if the host is invalid?



##########
raft/src/main/java/org/apache/kafka/raft/FollowerState.java:
##########
@@ -29,40 +30,42 @@
 public class FollowerState implements EpochState {
     private final int fetchTimeoutMs;
     private final int epoch;
-    private final int leaderId;
+    private final Node leader;
     private final Set<Integer> voters;
     // Used for tracking the expiration of both the Fetch and FetchSnapshot 
requests
     private final Timer fetchTimer;
     private Optional<LogOffsetMetadata> highWatermark;
     /* Used to track the currently fetching snapshot. When fetching snapshot 
regular
      * Fetch request are paused
      */
-    private Optional<RawSnapshotWriter> fetchingSnapshot;
+    private Optional<RawSnapshotWriter> fetchingSnapshot = Optional.empty();
+
+    // TODO: remove this when done debuggin CI failures

Review Comment:
   TODO reminder



##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -1234,7 +1286,22 @@ private boolean handleFetchResponse(
             } else {
                 Records records = 
FetchResponse.recordsOrFail(partitionResponse);
                 if (records.sizeInBytes() > 0) {
-                    appendAsFollower(records);
+                    // TODO: remove all of this code when done debugging

Review Comment:
   TODO reminder



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to