Re: [PR] KAFKA-16252: Dynamic KRaft network manager and channel [kafka]

2024-06-04 Thread via GitHub


ahuang98 commented on PR #16160:
URL: https://github.com/apache/kafka/pull/16160#issuecomment-2148481736

   Yeah looks like those are existing failures. Looks like Jose didn't take any 
of the changes, closing this out.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16252: Dynamic KRaft network manager and channel [kafka]

2024-06-04 Thread via GitHub


ahuang98 closed pull request #16160: KAFKA-16252: Dynamic KRaft network manager 
and channel
URL: https://github.com/apache/kafka/pull/16160


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16252: Dynamic KRaft network manager and channel [kafka]

2024-06-02 Thread via GitHub


cmccabe commented on PR #16160:
URL: https://github.com/apache/kafka/pull/16160#issuecomment-2143980581

   There are some test failures in `KafkaRaftClientTest`and `StorageToolTest`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16252; Dynamic KRaft network manager and channel [kafka]

2024-05-30 Thread via GitHub


showuon commented on code in PR #15986:
URL: https://github.com/apache/kafka/pull/15986#discussion_r1620608088


##
raft/src/main/java/org/apache/kafka/raft/RequestManager.java:
##
@@ -17,108 +17,296 @@
 package org.apache.kafka.raft;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
-import java.util.List;
+import java.util.Iterator;
 import java.util.Map;
-import java.util.OptionalInt;
+import java.util.Optional;
 import java.util.OptionalLong;
 import java.util.Random;
-import java.util.Set;
+import org.apache.kafka.common.Node;
 
+/**
+ * The request manager keeps tracks of the connection with remote replicas.
+ *
+ * When sending a request update this type by calling {@code 
onRequestSent(Node, long, long)}. When
+ * the RPC returns a response, update this manager with {@code 
onResponseResult(Node, long, boolean, long)}.
+ *
+ * Connections start in the ready state ({@code isReady(Node, long)} returns 
true).
+ *
+ * When a request times out or completes successfully the collection will 
transition back to the
+ * ready state.
+ *
+ * When a request completes with an error it still transition to the backoff 
state until
+ * {@code retryBackoffMs}.
+ */
 public class RequestManager {
-private final Map connections = new HashMap<>();
-private final List voters = new ArrayList<>();
+private final Map connections = new HashMap<>();
+private final ArrayList bootstrapServers;
 
 private final int retryBackoffMs;
 private final int requestTimeoutMs;
 private final Random random;
 
-public RequestManager(Set voterIds,
-  int retryBackoffMs,
-  int requestTimeoutMs,
-  Random random) {
-
+public RequestManager(
+Collection bootstrapServers,
+int retryBackoffMs,
+int requestTimeoutMs,
+Random random
+) {
+this.bootstrapServers = new ArrayList<>(bootstrapServers);
 this.retryBackoffMs = retryBackoffMs;
 this.requestTimeoutMs = requestTimeoutMs;
-this.voters.addAll(voterIds);
 this.random = random;
+}
 
-for (Integer voterId: voterIds) {
-ConnectionState connection = new ConnectionState(voterId);
-connections.put(voterId, connection);
+/**
+ * Returns true if there any connection with pending requests.
+ *
+ * This is useful for satisfying the invariant that there is only one 
pending Fetch request.
+ * If there are more than one pending fetch request, it is possible for 
the follower to write
+ * the same offset twice.
+ *
+ * @param currentTimeMs the current time
+ * @return true if the request manager is tracking at least one request
+ */
+public boolean hasAnyInflightRequest(long currentTimeMs) {
+boolean result = false;
+
+Iterator iterator = connections.values().iterator();
+while (iterator.hasNext()) {
+ConnectionState connection = iterator.next();
+if (connection.hasRequestTimedOut(currentTimeMs)) {
+// Mark the node as ready after request timeout
+iterator.remove();
+} else if (connection.isBackoffComplete(currentTimeMs)) {
+// Mark the node as ready after completed backoff
+iterator.remove();
+} else if (connection.hasInflightRequest(currentTimeMs)) {

Review Comment:
   It looks like we'll also check hasRequestTimeout in hasInflightRequest. We 
can consider to do a small optimization here in the future.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16252; Dynamic KRaft network manager and channel [kafka]

2024-05-30 Thread via GitHub


showuon commented on code in PR #15986:
URL: https://github.com/apache/kafka/pull/15986#discussion_r1620562119


##
raft/src/main/java/org/apache/kafka/raft/RequestManager.java:
##
@@ -17,108 +17,296 @@
 package org.apache.kafka.raft;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
-import java.util.List;
+import java.util.Iterator;
 import java.util.Map;
-import java.util.OptionalInt;
+import java.util.Optional;
 import java.util.OptionalLong;
 import java.util.Random;
-import java.util.Set;
+import org.apache.kafka.common.Node;
 
+/**
+ * The request manager keeps tracks of the connection with remote replicas.
+ *
+ * When sending a request update this type by calling {@code 
onRequestSent(Node, long, long)}. When
+ * the RPC returns a response, update this manager with {@code 
onResponseResult(Node, long, boolean, long)}.
+ *
+ * Connections start in the ready state ({@code isReady(Node, long)} returns 
true).
+ *
+ * When a request times out or completes successfully the collection will 
transition back to the
+ * ready state.
+ *
+ * When a request completes with an error it still transition to the backoff 
state until
+ * {@code retryBackoffMs}.
+ */
 public class RequestManager {
-private final Map connections = new HashMap<>();
-private final List voters = new ArrayList<>();
+private final Map connections = new HashMap<>();
+private final ArrayList bootstrapServers;
 
 private final int retryBackoffMs;
 private final int requestTimeoutMs;
 private final Random random;
 
-public RequestManager(Set voterIds,
-  int retryBackoffMs,
-  int requestTimeoutMs,
-  Random random) {
-
+public RequestManager(
+Collection bootstrapServers,
+int retryBackoffMs,
+int requestTimeoutMs,
+Random random
+) {
+this.bootstrapServers = new ArrayList<>(bootstrapServers);
 this.retryBackoffMs = retryBackoffMs;
 this.requestTimeoutMs = requestTimeoutMs;
-this.voters.addAll(voterIds);
 this.random = random;
+}
 
-for (Integer voterId: voterIds) {
-ConnectionState connection = new ConnectionState(voterId);
-connections.put(voterId, connection);
+/**
+ * Returns true if there any connection with pending requests.
+ *
+ * This is useful for satisfying the invariant that there is only one 
pending Fetch request.
+ * If there are more than one pending fetch request, it is possible for 
the follower to write
+ * the same offset twice.
+ *
+ * @param currentTimeMs the current time
+ * @return true if the request manager is tracking at least one request
+ */
+public boolean hasAnyInflightRequest(long currentTimeMs) {
+boolean result = false;
+
+Iterator iterator = connections.values().iterator();
+while (iterator.hasNext()) {
+ConnectionState connection = iterator.next();
+if (connection.hasRequestTimedOut(currentTimeMs)) {
+// Mark the node as ready after request timeout
+iterator.remove();
+} else if (connection.isBackoffComplete(currentTimeMs)) {
+// Mark the node as ready after completed backoff
+iterator.remove();
+} else if (connection.hasInflightRequest(currentTimeMs)) {
+// If there is at least one inflight request, it is enough
+// to stop checking the rest of the connections
+result = true;
+break;
+}
 }
-}
 
-public ConnectionState getOrCreate(int id) {
-return connections.computeIfAbsent(id, key -> new ConnectionState(id));
+return result;
 }
 
-public OptionalInt findReadyVoter(long currentTimeMs) {
-int startIndex = random.nextInt(voters.size());
-OptionalInt res = OptionalInt.empty();
-for (int i = 0; i < voters.size(); i++) {
-int index = (startIndex + i) % voters.size();
-Integer voterId = voters.get(index);
-ConnectionState connection = connections.get(voterId);
-boolean isReady = connection.isReady(currentTimeMs);
+/**
+ * Returns a random bootstrap node that is ready to receive a request.
+ *
+ * This method doesn't return a node if there is at least one request 
pending. In general this
+ * method is used to send Fetch requests. Fetch requests have the 
invariant that there can
+ * only be one pending Fetch request for the LEO.
+ *
+ * @param currentTimeMs the current time
+ * @return a random ready bootstrap node
+ */
+public Optional findReadyBootstrapServer(long currentTimeMs) {
+// Check that there are no infilght requests accross any of the known 
nodes not just
+// the bootstrap servers
+if 

Re: [PR] KAFKA-16252; Dynamic KRaft network manager and channel [kafka]

2024-05-29 Thread via GitHub


cmccabe commented on PR #15986:
URL: https://github.com/apache/kafka/pull/15986#issuecomment-2138338743

   Thanks for the PR, @jsancio . A few meta-comments:
   
   - I'm not sure I see the benefit to changing the `toString` functions to use 
`String.format`. It seems more brittle than the simple string concatenation 
approach. Unless you want to print something in a specific way, like 
`String.format("%02d", myInt)`. But that isn't the case here.
   
   - Changing the SharedServer constructor is a huge pain and generates a lot 
of churn. Since the thing you're passing comes from the static configuration 
anyway, let's not do that.
   
   - I think we should try to avoid doing too much validation in `KafkaConfig`. 
Things like hostnames should be resolved when we actually need them. It would 
be silly for one unresolvable hostname to make configuration validation fail, 
and hence fail the whole kcontroller startup process.
   
   - I don't think we want to change all of the tests to use 
`controller.quorum.bootstrap.servers`. We still need to support the old 
configuration. Let's make it so that tests using IBP_3_8_IV0 or newer use the 
new thing, and tests using an older MV use the old configuration. That way we 
will have good coverage.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16252; Dynamic KRaft network manager and channel [kafka]

2024-05-29 Thread via GitHub


cmccabe commented on code in PR #15986:
URL: https://github.com/apache/kafka/pull/15986#discussion_r1619507872


##
raft/src/main/java/org/apache/kafka/raft/FollowerState.java:
##
@@ -156,22 +156,24 @@ public boolean canGrantVote(ReplicaKey candidateKey, 
boolean isLogUpToDate) {
 log.debug(
 "Rejecting vote request from candidate ({}) since we already have 
a leader {} in epoch {}",
 candidateKey,
-leaderId(),
+leader,
 epoch
 );
 return false;
 }
 
 @Override
 public String toString() {
-return "FollowerState(" +
-"fetchTimeoutMs=" + fetchTimeoutMs +
-", epoch=" + epoch +
-", leaderId=" + leaderId +
-", voters=" + voters +
-", highWatermark=" + highWatermark +
-", fetchingSnapshot=" + fetchingSnapshot +
-')';
+return String.format(

Review Comment:
   Hmm, I'm not sure I see a lot of benefit to this change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16252; Dynamic KRaft network manager and channel [kafka]

2024-05-29 Thread via GitHub


cmccabe commented on code in PR #15986:
URL: https://github.com/apache/kafka/pull/15986#discussion_r1619506788


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -21,12 +21,12 @@ import kafka.utils.{TestInfoUtils, TestUtils}
 import org.apache.kafka.clients.admin.{NewPartitions, NewTopic}
 import org.apache.kafka.clients.consumer._
 import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
-import org.apache.kafka.common.{KafkaException, MetricName, TopicPartition}

Review Comment:
   It would be better to do this in a separate cleanup PR.



##
core/src/test/resources/log4j.properties:
##
@@ -21,6 +21,5 @@ log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m 
(%c:%L)%n
 log4j.logger.kafka=WARN
 log4j.logger.org.apache.kafka=WARN
 
-

Review Comment:
   It would be better to do this in a separate cleanup PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16252; Dynamic KRaft network manager and channel [kafka]

2024-05-29 Thread via GitHub


cmccabe commented on code in PR #15986:
URL: https://github.com/apache/kafka/pull/15986#discussion_r1619520419


##
raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java:
##
@@ -55,15 +57,41 @@ final public class VoterSet {
 }
 
 /**
- * Returns the socket address for a given voter at a given listener.
+ * Returns the node information for all the given voter ids and listener.
  *
- * @param voter the id of the voter
- * @param listener the name of the listener
- * @return the socket address if it exists, otherwise {@code 
Optional.empty()}
+ * @param voterIds the ids of the voters
+ * @param listenerName the name of the listener
+ * @return the node information for all of the voter ids
+ * @throws IllegalArgumentException if there are missing endpoints
  */
-public Optional voterAddress(int voter, String 
listener) {
-return Optional.ofNullable(voters.get(voter))
-.flatMap(voterNode -> voterNode.address(listener));
+public Set voterNodes(Stream voterIds, ListenerName 
listenerName) {

Review Comment:
   This seems like the wrong behavior to me for the case where one of the 
voters doesn't have a specific listener, but the other ones do. Wouldn't we 
want to continue to use the other, working voters in that case?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16252; Dynamic KRaft network manager and channel [kafka]

2024-05-29 Thread via GitHub


cmccabe commented on code in PR #15986:
URL: https://github.com/apache/kafka/pull/15986#discussion_r1619518236


##
raft/src/main/java/org/apache/kafka/raft/RequestManager.java:
##
@@ -17,108 +17,296 @@
 package org.apache.kafka.raft;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
-import java.util.List;
+import java.util.Iterator;
 import java.util.Map;
-import java.util.OptionalInt;
+import java.util.Optional;
 import java.util.OptionalLong;
 import java.util.Random;
-import java.util.Set;
+import org.apache.kafka.common.Node;
 
+/**
+ * The request manager keeps tracks of the connection with remote replicas.
+ *
+ * When sending a request update this type by calling {@code 
onRequestSent(Node, long, long)}. When
+ * the RPC returns a response, update this manager with {@code 
onResponseResult(Node, long, boolean, long)}.
+ *
+ * Connections start in the ready state ({@code isReady(Node, long)} returns 
true).
+ *
+ * When a request times out or completes successfully the collection will 
transition back to the
+ * ready state.
+ *
+ * When a request completes with an error it still transition to the backoff 
state until
+ * {@code retryBackoffMs}.
+ */
 public class RequestManager {
-private final Map connections = new HashMap<>();
-private final List voters = new ArrayList<>();
+private final Map connections = new HashMap<>();

Review Comment:
   I don't understand why you are changing this to treat node ID as a string. 
An Int seems better.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16252; Dynamic KRaft network manager and channel [kafka]

2024-05-29 Thread via GitHub


cmccabe commented on code in PR #15986:
URL: https://github.com/apache/kafka/pull/15986#discussion_r1619514531


##
raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java:
##
@@ -199,6 +206,34 @@ private static Map 
parseVoterConnections(
 return voterMap;
 }
 
+public static InetSocketAddress parseBootstrapServer(String 
bootstrapServer) {
+String host = Utils.getHost(bootstrapServer);
+if (host == null) {
+throw new ConfigException(
+String.format(
+"Failed to parse host name from {} for the configuration 
{}. Each " +
+"entry should be in the form \"{host}:{port}\"",
+bootstrapServer,
+QUORUM_BOOTSTRAP_SERVERS_CONFIG
+)
+);
+}
+
+Integer port = Utils.getPort(bootstrapServer);
+if (port == null) {
+throw new ConfigException(
+String.format(
+"Failed to parse host port from {} for the configuration 
{}. Each " +
+"entry should be in the form \"{host}:{port}\"",
+bootstrapServer,
+QUORUM_BOOTSTRAP_SERVERS_CONFIG
+)
+);
+}
+
+return InetSocketAddress.createUnresolved(host, port);

Review Comment:
   I think it's a mistake to resolve hostnames as part of configuration 
validation. Just treat them as strings.
   
   Otherwise you could end up not being able to start Kafka because 1 out of 3 
hostnames in your bootstrap list could not be resolved (because of a DNS 
problem or whatever)
   
   Resolve a hostname when you're actually using it, not before.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16252; Dynamic KRaft network manager and channel [kafka]

2024-05-29 Thread via GitHub


cmccabe commented on code in PR #15986:
URL: https://github.com/apache/kafka/pull/15986#discussion_r1619511561


##
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##
@@ -1635,24 +1693,29 @@ private Optional maybeHandleCommonResponse(
 }
 
 private void maybeTransition(
-OptionalInt leaderId,
+Optional leader,
 int epoch,
 long currentTimeMs
 ) {
+OptionalInt leaderId = OptionalInt.empty();

Review Comment:
   Again would be more clearly expressed with `?:` rather than mutating the 
OptionalInt reference



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16252; Dynamic KRaft network manager and channel [kafka]

2024-05-29 Thread via GitHub


cmccabe commented on code in PR #15986:
URL: https://github.com/apache/kafka/pull/15986#discussion_r1619510911


##
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##
@@ -1593,6 +1644,13 @@ private Optional maybeHandleCommonResponse(
 int epoch,
 long currentTimeMs
 ) {
+Optional leader = Optional.empty();

Review Comment:
   This would be more clearly expressed as something like 
   ```
   Optional leader = leaderId.isPresent() ? 
 partitionState.lastVoterSet().voterNode(leaderId.getAsInt(), 
channel.listenerName()) : 
 Optional.empty();
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16252; Dynamic KRaft network manager and channel [kafka]

2024-05-29 Thread via GitHub


cmccabe commented on code in PR #15986:
URL: https://github.com/apache/kafka/pull/15986#discussion_r1619507872


##
raft/src/main/java/org/apache/kafka/raft/FollowerState.java:
##
@@ -156,22 +156,24 @@ public boolean canGrantVote(ReplicaKey candidateKey, 
boolean isLogUpToDate) {
 log.debug(
 "Rejecting vote request from candidate ({}) since we already have 
a leader {} in epoch {}",
 candidateKey,
-leaderId(),
+leader,
 epoch
 );
 return false;
 }
 
 @Override
 public String toString() {
-return "FollowerState(" +
-"fetchTimeoutMs=" + fetchTimeoutMs +
-", epoch=" + epoch +
-", leaderId=" + leaderId +
-", voters=" + voters +
-", highWatermark=" + highWatermark +
-", fetchingSnapshot=" + fetchingSnapshot +
-')';
+return String.format(

Review Comment:
   I'm not sure I see a lot of benefit to this change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16252; Dynamic KRaft network manager and channel [kafka]

2024-05-29 Thread via GitHub


cmccabe commented on code in PR #15986:
URL: https://github.com/apache/kafka/pull/15986#discussion_r1619506788


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -21,12 +21,12 @@ import kafka.utils.{TestInfoUtils, TestUtils}
 import org.apache.kafka.clients.admin.{NewPartitions, NewTopic}
 import org.apache.kafka.clients.consumer._
 import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
-import org.apache.kafka.common.{KafkaException, MetricName, TopicPartition}

Review Comment:
   Do this in a separate cleanup PR.



##
core/src/test/resources/log4j.properties:
##
@@ -21,6 +21,5 @@ log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m 
(%c:%L)%n
 log4j.logger.kafka=WARN
 log4j.logger.org.apache.kafka=WARN
 
-

Review Comment:
   Do this in a separate cleanup PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16252; Dynamic KRaft network manager and channel [kafka]

2024-05-29 Thread via GitHub


cmccabe commented on code in PR #15986:
URL: https://github.com/apache/kafka/pull/15986#discussion_r1619506439


##
core/src/main/scala/kafka/server/SharedServer.scala:
##
@@ -94,6 +95,7 @@ class SharedServer(
   val time: Time,
   private val _metrics: Metrics,
   val controllerQuorumVotersFuture: CompletableFuture[JMap[Integer, 
InetSocketAddress]],
+  val bootstrapServers: JCollection[InetSocketAddress],

Review Comment:
   I don't see why this needs to be a new argument. We already have KafkaConfig 
passed in as an argument, and this just comes from 
`QuorumConfig.parseBootstrapServers(config.quorumBootstrapServers)`.
   
   Adding a new argument here creates a lot of churn, which is a lot of work 
for merges and such.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16252; Dynamic KRaft network manager and channel [kafka]

2024-05-27 Thread via GitHub


jsancio commented on PR #15986:
URL: https://github.com/apache/kafka/pull/15986#issuecomment-2134135719

   @hachikuji, thanks for the review. The PR is ready for another round.
   
   The core of the issue that I fixed was that KRaft was sending two Fetch 
requests for the same LEO. One of the Fetch request was going to the leader 
using the Node with an id greater than or equal to 0. The second Fetch request 
was going to the leader using the Node in the bootstrap server list which had a 
negative id.
   
   I fixed the issue by updating the `RequestManager` and `KafkaRaftClient` to 
only allow one outstanding (Fetch) request.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16252; Dynamic KRaft network manager and channel [kafka]

2024-05-26 Thread via GitHub


jsancio commented on code in PR #15986:
URL: https://github.com/apache/kafka/pull/15986#discussion_r1615318177


##
raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java:
##
@@ -199,6 +206,34 @@ private static Map 
parseVoterConnections(
 return voterMap;
 }
 
+public static InetSocketAddress parseBootstrapServer(String 
bootstrapServer) {
+String host = Utils.getHost(bootstrapServer);
+if (host == null) {
+throw new ConfigException(
+String.format(
+"Failed to parse host name from {} for the configuration 
{}. Each " +
+"entry should be in the form \"{host}:{port}\"",
+bootstrapServer,
+QUORUM_BOOTSTRAP_SERVERS_CONFIG
+)
+);
+}
+
+Integer port = Utils.getPort(bootstrapServer);
+if (port == null) {
+throw new ConfigException(
+String.format(
+"Failed to parse host port from {} for the configuration 
{}. Each " +
+"entry should be in the form \"{host}:{port}\"",
+bootstrapServer,
+QUORUM_BOOTSTRAP_SERVERS_CONFIG
+)
+);
+}
+
+return InetSocketAddress.createUnresolved(host, port);

Review Comment:
   I am not sure. I know that it throws an `IllegalArgumentException` if host 
is `null` or port is outside the valid range. I think it is better for 
`Utils.getHost` and `Utils.getPort` to better catch invalid host and port 
values so we can throw the corresponding `ConfigException`. I filed 
https://issues.apache.org/jira/browse/KAFKA-16824 to improve the implementation 
of those methods.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16252; Dynamic KRaft network manager and channel [kafka]

2024-05-25 Thread via GitHub


jsancio commented on code in PR #15986:
URL: https://github.com/apache/kafka/pull/15986#discussion_r1614958193


##
raft/src/main/java/org/apache/kafka/raft/RequestManager.java:
##
@@ -17,108 +17,196 @@
 package org.apache.kafka.raft;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
-import java.util.OptionalInt;
+import java.util.Optional;
 import java.util.OptionalLong;
 import java.util.Random;
-import java.util.Set;
+import org.apache.kafka.common.Node;
 
 public class RequestManager {
-private final Map connections = new HashMap<>();
-private final List voters = new ArrayList<>();
+private final Map connections = new HashMap<>();
+private final ArrayList bootstrapServers;
 
 private final int retryBackoffMs;
 private final int requestTimeoutMs;
 private final Random random;
 
-public RequestManager(Set voterIds,
-  int retryBackoffMs,
-  int requestTimeoutMs,
-  Random random) {
-
+public RequestManager(
+Collection bootstrapServers,
+int retryBackoffMs,
+int requestTimeoutMs,
+Random random
+) {
+this.bootstrapServers = new ArrayList<>(bootstrapServers);
 this.retryBackoffMs = retryBackoffMs;
 this.requestTimeoutMs = requestTimeoutMs;
-this.voters.addAll(voterIds);
 this.random = random;
-
-for (Integer voterId: voterIds) {
-ConnectionState connection = new ConnectionState(voterId);
-connections.put(voterId, connection);
-}
-}
-
-public ConnectionState getOrCreate(int id) {
-return connections.computeIfAbsent(id, key -> new ConnectionState(id));
 }
 
-public OptionalInt findReadyVoter(long currentTimeMs) {
-int startIndex = random.nextInt(voters.size());
-OptionalInt res = OptionalInt.empty();
-for (int i = 0; i < voters.size(); i++) {
-int index = (startIndex + i) % voters.size();
-Integer voterId = voters.get(index);
-ConnectionState connection = connections.get(voterId);
-boolean isReady = connection.isReady(currentTimeMs);
+public Optional findReadyBootstrapServer(long currentTimeMs) {
+int startIndex = random.nextInt(bootstrapServers.size());
+Optional res = Optional.empty();
+for (int i = 0; i < bootstrapServers.size(); i++) {
+int index = (startIndex + i) % bootstrapServers.size();
+Node node = bootstrapServers.get(index);
 
-if (isReady) {
-res = OptionalInt.of(voterId);
-} else if (connection.inFlightCorrelationId.isPresent()) {
-res = OptionalInt.empty();
+if (isReady(node, currentTimeMs)) {
+res = Optional.of(node);
+} else if (hasInflightRequest(node, currentTimeMs)) {
+res = Optional.empty();
 break;
 }
 }
+
 return res;
 }
 
-public long backoffBeforeAvailableVoter(long currentTimeMs) {
-long minBackoffMs = Long.MAX_VALUE;
-for (Integer voterId : voters) {
-ConnectionState connection = connections.get(voterId);
-if (connection.isReady(currentTimeMs)) {
+public long backoffBeforeAvailableBootstrapServer(long currentTimeMs) {
+long minBackoffMs = Math.max(retryBackoffMs, requestTimeoutMs);
+for (Node node : bootstrapServers) {
+if (isReady(node, currentTimeMs)) {
 return 0L;
-} else if (connection.isBackingOff(currentTimeMs)) {
-minBackoffMs = Math.min(minBackoffMs, 
connection.remainingBackoffMs(currentTimeMs));
+} else if (isBackingOff(node, currentTimeMs)) {
+minBackoffMs = Math.min(minBackoffMs, remainingBackoffMs(node, 
currentTimeMs));
 } else {
-minBackoffMs = Math.min(minBackoffMs, 
connection.remainingRequestTimeMs(currentTimeMs));
+minBackoffMs = Math.min(minBackoffMs, 
remainingRequestTimeMs(node, currentTimeMs));
 }
 }
+
 return minBackoffMs;
 }
 
+public boolean hasRequestTimedOut(Node node, long timeMs) {
+ConnectionState state = connections.get(node.idString());
+if (state == null) {
+return false;
+}
+
+return state.hasRequestTimedOut(timeMs);
+}
+
+public boolean isReady(Node node, long timeMs) {
+ConnectionState state = connections.get(node.idString());
+if (state == null) {
+return true;
+}
+
+boolean ready = state.isReady(timeMs);
+if (ready) {
+reset(node);
+}
+
+return ready;
+}
+
+public boolean isBackingOff(Node node, long timeMs) {
+ConnectionState state = 

Re: [PR] KAFKA-16252; Dynamic KRaft network manager and channel [kafka]

2024-05-25 Thread via GitHub


jsancio commented on code in PR #15986:
URL: https://github.com/apache/kafka/pull/15986#discussion_r1610641974


##
core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala:
##
@@ -1366,6 +1368,26 @@ class KafkaConfigTest {
 assertEquals(expectedVoters, addresses)
   }
 
+  @Test
+  def testParseQuorumBootstrapServers(): Unit = {

Review Comment:
   Okay. It does look like there are bugs in the implementation of 
`Utils.getHost` and `Utils.getPort`. I don't really want to fix them in this 
PR. I created this issue: https://issues.apache.org/jira/browse/KAFKA-16824



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16252; Dynamic KRaft network manager and channel [kafka]

2024-05-23 Thread via GitHub


jsancio commented on code in PR #15986:
URL: https://github.com/apache/kafka/pull/15986#discussion_r1611947146


##
core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala:
##
@@ -118,6 +120,10 @@ class RaftManagerTest {
   new Metrics(Time.SYSTEM),
   Option.empty,
   
CompletableFuture.completedFuture(QuorumConfig.parseVoterConnections(config.quorumVoters)),
+  config.quorumBootstrapServers

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16252; Dynamic KRaft network manager and channel [kafka]

2024-05-23 Thread via GitHub


jsancio commented on code in PR #15986:
URL: https://github.com/apache/kafka/pull/15986#discussion_r1611946739


##
raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java:
##
@@ -231,4 +266,26 @@ public String toString() {
 return "non-empty list";
 }
 }
+
+public static class ControllerQuorumBootstrapServersValidator implements 
ConfigDef.Validator {
+@Override
+public void ensureValid(String name, Object value) {
+if (value == null) {
+throw new ConfigException(name, null);
+}
+
+@SuppressWarnings("unchecked")
+List entries = (List) value;
+
+// Attempt to parse the connect strings
+for (String entry : entries) {
+QuorumConfig.parseBootstrapServer(entry);

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16252; Dynamic KRaft network manager and channel [kafka]

2024-05-23 Thread via GitHub


jsancio commented on code in PR #15986:
URL: https://github.com/apache/kafka/pull/15986#discussion_r1611946424


##
raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java:
##
@@ -53,15 +56,41 @@ final public class VoterSet {
 }
 
 /**
- * Returns the socket address for a given voter at a given listener.
+ * Returns the node information for all the given voter ids and listener.
  *
- * @param voter the id of the voter
- * @param listener the name of the listener
- * @return the socket address if it exists, otherwise {@code 
Optional.empty()}
+ * @param voterIds the id of the voters

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16252; Dynamic KRaft network manager and channel [kafka]

2024-05-23 Thread via GitHub


jsancio commented on code in PR #15986:
URL: https://github.com/apache/kafka/pull/15986#discussion_r1611939749


##
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##
@@ -393,17 +422,35 @@ public void initialize(
 logger.info("Reading KRaft snapshot and log as part of the 
initialization");
 partitionState.updateState();
 
-VoterSet lastVoterSet = partitionState.lastVoterSet();
-requestManager = new RequestManager(
-lastVoterSet.voterIds(),
-quorumConfig.retryBackoffMs(),
-quorumConfig.requestTimeoutMs(),
-random
-);
+if (requestManager == null) {
+// The request manager wasn't created using the bootstrap servers
+// create it using the voters static configuration
+List bootstrapNodes = voterAddresses
+.entrySet()
+.stream()
+.map(entry ->
+new Node(
+entry.getKey(),
+entry.getValue().getHostString(),
+entry.getValue().getPort()
+)
+)
+.collect(Collectors.toList());
+
+logger.info("Starting request manager with bootstrap servers: {}", 
bootstrapNodes);

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16252; Dynamic KRaft network manager and channel [kafka]

2024-05-22 Thread via GitHub


jsancio commented on code in PR #15986:
URL: https://github.com/apache/kafka/pull/15986#discussion_r1610641974


##
core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala:
##
@@ -1366,6 +1368,26 @@ class KafkaConfigTest {
 assertEquals(expectedVoters, addresses)
   }
 
+  @Test
+  def testParseQuorumBootstrapServers(): Unit = {

Review Comment:
   Okay. It does look like there are bugs in the in the implementation of 
`Utils.getHost` and `Utils.getPort`. I don't really want to fix them in this 
PR. I created this issue: https://issues.apache.org/jira/browse/KAFKA-16824



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16252; Dynamic KRaft network manager and channel [kafka]

2024-05-22 Thread via GitHub


hachikuji commented on code in PR #15986:
URL: https://github.com/apache/kafka/pull/15986#discussion_r1610398594


##
core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala:
##
@@ -118,6 +120,10 @@ class RaftManagerTest {
   new Metrics(Time.SYSTEM),
   Option.empty,
   
CompletableFuture.completedFuture(QuorumConfig.parseVoterConnections(config.quorumVoters)),
+  config.quorumBootstrapServers

Review Comment:
   nit: we seem to have this same little snippet in a few places. Is there 
somewhere we could add a helper?



##
core/src/test/resources/log4j.properties:
##
@@ -20,6 +20,8 @@ log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m 
(%c:%L)%n
 
 log4j.logger.kafka=WARN
 log4j.logger.org.apache.kafka=WARN
+# TODO; remove this line

Review Comment:
   Reminder about TODO



##
raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java:
##
@@ -231,4 +266,26 @@ public String toString() {
 return "non-empty list";
 }
 }
+
+public static class ControllerQuorumBootstrapServersValidator implements 
ConfigDef.Validator {
+@Override
+public void ensureValid(String name, Object value) {
+if (value == null) {
+throw new ConfigException(name, null);
+}
+
+@SuppressWarnings("unchecked")
+List entries = (List) value;
+
+// Attempt to parse the connect strings
+for (String entry : entries) {
+QuorumConfig.parseBootstrapServer(entry);

Review Comment:
   nit: drop unnecessary `QuorumConfig` prefix?



##
core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala:
##
@@ -1366,6 +1368,26 @@ class KafkaConfigTest {
 assertEquals(expectedVoters, addresses)
   }
 
+  @Test
+  def testParseQuorumBootstrapServers(): Unit = {

Review Comment:
   Perhaps we should have some invalid test cases as well?



##
raft/src/main/java/org/apache/kafka/raft/RequestManager.java:
##
@@ -17,108 +17,196 @@
 package org.apache.kafka.raft;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
-import java.util.OptionalInt;
+import java.util.Optional;
 import java.util.OptionalLong;
 import java.util.Random;
-import java.util.Set;
+import org.apache.kafka.common.Node;
 
 public class RequestManager {
-private final Map connections = new HashMap<>();
-private final List voters = new ArrayList<>();
+private final Map connections = new HashMap<>();
+private final ArrayList bootstrapServers;
 
 private final int retryBackoffMs;
 private final int requestTimeoutMs;
 private final Random random;
 
-public RequestManager(Set voterIds,
-  int retryBackoffMs,
-  int requestTimeoutMs,
-  Random random) {
-
+public RequestManager(
+Collection bootstrapServers,
+int retryBackoffMs,
+int requestTimeoutMs,
+Random random
+) {
+this.bootstrapServers = new ArrayList<>(bootstrapServers);
 this.retryBackoffMs = retryBackoffMs;
 this.requestTimeoutMs = requestTimeoutMs;
-this.voters.addAll(voterIds);
 this.random = random;
-
-for (Integer voterId: voterIds) {
-ConnectionState connection = new ConnectionState(voterId);
-connections.put(voterId, connection);
-}
-}
-
-public ConnectionState getOrCreate(int id) {
-return connections.computeIfAbsent(id, key -> new ConnectionState(id));
 }
 
-public OptionalInt findReadyVoter(long currentTimeMs) {
-int startIndex = random.nextInt(voters.size());
-OptionalInt res = OptionalInt.empty();
-for (int i = 0; i < voters.size(); i++) {
-int index = (startIndex + i) % voters.size();
-Integer voterId = voters.get(index);
-ConnectionState connection = connections.get(voterId);
-boolean isReady = connection.isReady(currentTimeMs);
+public Optional findReadyBootstrapServer(long currentTimeMs) {
+int startIndex = random.nextInt(bootstrapServers.size());
+Optional res = Optional.empty();
+for (int i = 0; i < bootstrapServers.size(); i++) {
+int index = (startIndex + i) % bootstrapServers.size();
+Node node = bootstrapServers.get(index);
 
-if (isReady) {
-res = OptionalInt.of(voterId);
-} else if (connection.inFlightCorrelationId.isPresent()) {
-res = OptionalInt.empty();
+if (isReady(node, currentTimeMs)) {
+res = Optional.of(node);
+} else if (hasInflightRequest(node, currentTimeMs)) {
+res = Optional.empty();
 break;
 }
 }
+
 return res;
 }
 
-public long