hachikuji commented on a change in pull request #9985:
URL: https://github.com/apache/kafka/pull/9985#discussion_r565722046



##########
File path: core/src/test/scala/unit/kafka/raft/KafkaNetworkChannelTest.scala
##########
@@ -139,6 +145,25 @@ class KafkaNetworkChannelTest {
     }
   }
 
+  @Test
+  def testNonRoutableAddressUpdateRequest(): Unit = {
+    val destinationId = 2
+    assertThrows(classOf[IllegalArgumentException],
+      () => new InetAddressSpec(new InetSocketAddress("0.0.0.0", 0)))

Review comment:
       Can we move this to `RaftConfigTest`? It's not really part of the 
behavior of `KafkaNetworkChannel`.

##########
File path: raft/src/main/java/org/apache/kafka/raft/RaftConfig.java
##########
@@ -76,7 +87,48 @@
     private final int electionBackoffMaxMs;
     private final int fetchTimeoutMs;
     private final int appendLingerMs;
-    private final Map<Integer, InetSocketAddress> voterConnections;
+    private final Map<Integer, AddressSpec> voterConnections;
+
+    public static abstract class AddressSpec {
+       public abstract InetSocketAddress address();

Review comment:
       Do we need this in the abstract class? I was thinking we would only be 
able to access `InetSocketAddress` if the type is `InetAddressSpec`. Otherwise 
the type protection from `AddressSpec` loses its bite.

##########
File path: core/src/test/scala/unit/kafka/raft/KafkaNetworkChannelTest.scala
##########
@@ -139,6 +145,25 @@ class KafkaNetworkChannelTest {
     }
   }
 
+  @Test
+  def testNonRoutableAddressUpdateRequest(): Unit = {
+    val destinationId = 2
+    assertThrows(classOf[IllegalArgumentException],
+      () => new InetAddressSpec(new InetSocketAddress("0.0.0.0", 0)))
+
+    // Update channel with a valid endpoint

Review comment:
       Not sure there's much value in the rest of this test. Seems effectively 
the same as `testSendAndReceiveOutboundRequest`.

##########
File path: raft/src/main/java/org/apache/kafka/raft/RaftConfig.java
##########
@@ -76,7 +87,48 @@
     private final int electionBackoffMaxMs;
     private final int fetchTimeoutMs;
     private final int appendLingerMs;
-    private final Map<Integer, InetSocketAddress> voterConnections;
+    private final Map<Integer, AddressSpec> voterConnections;
+
+    public static abstract class AddressSpec {
+       public abstract InetSocketAddress address();
+
+        @Override
+        public boolean equals(Object obj) {
+            if (this == obj) {
+                return true;
+            }
+
+            if (obj == null || getClass() != obj.getClass()) {
+                return false;
+            }
+
+            final AddressSpec that = (AddressSpec) obj;
+            return that.address().equals(address());
+        }
+    }
+
+    public static class InetAddressSpec extends AddressSpec {
+        private final InetSocketAddress address;
+
+        public InetAddressSpec(InetSocketAddress address) {
+            if (address.equals(UNROUTABLE_ADDRESS)) {
+                throw new IllegalArgumentException("Address not routable");
+            }
+            this.address = address;
+        }
+
+        @Override
+        public InetSocketAddress address() {
+            return address;
+        }
+    }
+
+    public static class UnknownAddressSpec extends AddressSpec {

Review comment:
       A common pattern for classes like this without any state is to create a 
static instance.
   ```java
     public static final UnknownAddressSpec INSTANCE = new UnknownAddressSpec();
   ```

##########
File path: core/src/main/scala/kafka/raft/RaftManager.scala
##########
@@ -118,9 +119,20 @@ class KafkaRaftManager[T](
   private val raftIoThread = new RaftIoThread(raftClient)
 
   def startup(): Unit = {
+    // Update the voter endpoints (if valid) with what's in RaftConfig
+    val voterAddresses: util.Map[Integer, AddressSpec] = 
raftConfig.quorumVoterConnections
+    for (voterAddressEntry <- voterAddresses.entrySet.asScala) {
+      voterAddressEntry.getValue match {
+        case spec: InetAddressSpec => {
+          netChannel.updateEndpoint(voterAddressEntry.getKey, spec)
+        }
+        case invalid: AddressSpec => {
+          logger.warn(s"Skipping channel update for destination ID: 
${voterAddressEntry.getKey} " +

Review comment:
       This could be `info` I think in the case of `UnknownAddressSpec`. It is 
expected behavior to skip the update. We could add a third case for unexpected 
`AddressSpec` types.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to