Re: [PR] KAFKA-16516: Fix the controller node provider for broker to control channel [kafka]

2024-05-24 Thread via GitHub


cmccabe commented on PR #16008:
URL: https://github.com/apache/kafka/pull/16008#issuecomment-2129992325

   merged (from CLI, since GH is acting flaky)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16516: Fix the controller node provider for broker to control channel [kafka]

2024-05-24 Thread via GitHub


cmccabe closed pull request #16008: KAFKA-16516: Fix the controller node 
provider for broker to control channel
URL: https://github.com/apache/kafka/pull/16008


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16516: Fix the controller node provider for broker to control channel [kafka]

2024-05-22 Thread via GitHub


jsancio commented on code in PR #16008:
URL: https://github.com/apache/kafka/pull/16008#discussion_r1610457959


##
core/src/main/scala/kafka/server/NodeToControllerChannelManager.scala:
##
@@ -120,15 +118,15 @@ object RaftControllerNodeProvider {
  */
 class RaftControllerNodeProvider(
   val raftManager: RaftManager[ApiMessageAndVersion],
-  controllerQuorumVoterNodes: Seq[Node],
   val listenerName: ListenerName,
   val securityProtocol: SecurityProtocol,
   val saslMechanism: String
 ) extends ControllerNodeProvider with Logging {
-  private val idToNode = controllerQuorumVoterNodes.map(node => node.id() -> 
node).toMap
+
+  private def idToNode(id: Int): Option[Node] = raftManager.voterNode(id, 
listenerName.value())
 
   override def getControllerInfo(): ControllerInformation =
-
ControllerInformation(raftManager.leaderAndEpoch.leaderId.asScala.map(idToNode),
+
ControllerInformation(raftManager.leaderAndEpoch.leaderId.asScala.flatMap(idToNode),

Review Comment:
   Okay. In a future change PR, I'll change this to `Option[Node] 
RaftManager.leaderNode()` since it looks like the node is always the leader 
node a not some arbitrary voters.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16516: Fix the controller node provider for broker to control channel [kafka]

2024-05-21 Thread via GitHub


cmccabe commented on PR #16008:
URL: https://github.com/apache/kafka/pull/16008#issuecomment-2123394515

   > Are there any tests that we should add or do the existing test cover this 
functionality?
   
   I think there are some that cover it already, but I added a test of 
controller failover in `KRaftClusterTest`, just to be sure.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16516: Fix the controller node provider for broker to control channel [kafka]

2024-05-21 Thread via GitHub


cmccabe commented on code in PR #16008:
URL: https://github.com/apache/kafka/pull/16008#discussion_r1608907374


##
core/src/main/scala/kafka/raft/RaftManager.scala:
##
@@ -313,4 +314,8 @@ class KafkaRaftManager[T](
   override def leaderAndEpoch: LeaderAndEpoch = {
 client.leaderAndEpoch
   }
+
+  override def idToNode(id: Int, listener: String): Option[Node] = {
+client.idToNode(id, listener).toScala
+  }

Review Comment:
   ack



##
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##
@@ -2548,6 +2549,10 @@ public OptionalLong highWatermark() {
 }
 }
 
+public Optional idToNode(int id, String listener) {

Review Comment:
   ack



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16516: Fix the controller node provider for broker to control channel [kafka]

2024-05-21 Thread via GitHub


cmccabe commented on code in PR #16008:
URL: https://github.com/apache/kafka/pull/16008#discussion_r1608906143


##
raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java:
##
@@ -366,4 +368,16 @@ public static VoterSet fromInetSocketAddresses(String 
listener, Map idToNode(int id, String listener) {
+VoterNode voterNode = voters.get(id);
+if (voterNode == null) {
+return Optional.empty();
+}
+InetSocketAddress address = voterNode.listeners.get(listener);
+if (address == null) {
+return Optional.empty();
+}
+return Optional.of(new Node(id, address.getHostString(), 
address.getPort()));
+}

Review Comment:
   ok



##
raft/src/main/java/org/apache/kafka/raft/QuorumState.java:
##
@@ -548,6 +550,10 @@ public VotedState votedStateOrThrow() {
 .orElseThrow(() -> new IllegalStateException("Expected to be 
Voted, but current state is " + state));
 }
 
+public Optional idToNode(int id, String listener) {
+return latestVoterSet.get().idToNode(id, listener);
+}

Review Comment:
   removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16516: Fix the controller node provider for broker to control channel [kafka]

2024-05-21 Thread via GitHub


cmccabe commented on code in PR #16008:
URL: https://github.com/apache/kafka/pull/16008#discussion_r1608904666


##
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##
@@ -2548,6 +2549,10 @@ public OptionalLong highWatermark() {
 }
 }
 
+public Optional idToNode(int id, String listener) {
+return quorum.idToNode(id, listener);

Review Comment:
   ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16516: Fix the controller node provider for broker to control channel [kafka]

2024-05-21 Thread via GitHub


jsancio commented on code in PR #16008:
URL: https://github.com/apache/kafka/pull/16008#discussion_r1608891761


##
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##
@@ -2548,6 +2549,10 @@ public OptionalLong highWatermark() {
 }
 }
 
+public Optional idToNode(int id, String listener) {
+return quorum.idToNode(id, listener);

Review Comment:
   Let's use `partitionState` directly and remove the changes to `QuorumState`.
   ```java
   return partitionState.latestVoterSet().idToNode(id, listener);
   ```



##
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##
@@ -2548,6 +2549,10 @@ public OptionalLong highWatermark() {
 }
 }
 
+public Optional idToNode(int id, String listener) {

Review Comment:
   Similar to my other comments but how about `voterNode` for the name of the 
method?



##
core/src/main/scala/kafka/raft/RaftManager.scala:
##
@@ -313,4 +314,8 @@ class KafkaRaftManager[T](
   override def leaderAndEpoch: LeaderAndEpoch = {
 client.leaderAndEpoch
   }
+
+  override def idToNode(id: Int, listener: String): Option[Node] = {
+client.idToNode(id, listener).toScala
+  }

Review Comment:
   I prefer if we call this something like `voterNode` as it makes it clear 
that this only resolves to a `Option[Node]` if the id is a voter/controller.



##
raft/src/main/java/org/apache/kafka/raft/QuorumState.java:
##
@@ -548,6 +550,10 @@ public VotedState votedStateOrThrow() {
 .orElseThrow(() -> new IllegalStateException("Expected to be 
Voted, but current state is " + state));
 }
 
+public Optional idToNode(int id, String listener) {
+return latestVoterSet.get().idToNode(id, listener);
+}

Review Comment:
   See my other comment but I think we can remove this.



##
raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java:
##
@@ -366,4 +368,16 @@ public static VoterSet fromInetSocketAddresses(String 
listener, Map idToNode(int id, String listener) {
+VoterNode voterNode = voters.get(id);
+if (voterNode == null) {
+return Optional.empty();
+}
+InetSocketAddress address = voterNode.listeners.get(listener);
+if (address == null) {
+return Optional.empty();
+}
+return Optional.of(new Node(id, address.getHostString(), 
address.getPort()));
+}

Review Comment:
   Okay. I added a similar method in this PR: 
https://github.com/apache/kafka/pull/15986/files#diff-7164c449a4cc53dd28cc1a7201fa8b7a824749dab013fb33dff101a1002565daR86-R101
   
   If you agree, do you mind changing the name to `voterNode` to minimize the 
conflicts. Don't worry about using `ListenerName` instead of `String` as my PR 
deals with that change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16516: Fix the controller node provider for broker to control channel [kafka]

2024-05-21 Thread via GitHub


cmccabe commented on PR #16008:
URL: https://github.com/apache/kafka/pull/16008#issuecomment-2123296527

   I have re-run all the test failures locally, and they all passed (they were 
flakes)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org