[GitHub] [kafka] aloknnikhil commented on a change in pull request #9985: KAFKA-12237: Support non-routable quorum voter addresses

2021-01-28 Thread GitBox


aloknnikhil commented on a change in pull request #9985:
URL: https://github.com/apache/kafka/pull/9985#discussion_r566406761



##
File path: raft/src/main/java/org/apache/kafka/raft/RaftConfig.java
##
@@ -99,7 +99,7 @@
 public final InetSocketAddress address;
 
 public InetAddressSpec(InetSocketAddress address) {
-if (address.equals(NON_ROUTABLE_ADDRESS)) {
+if (address != null && address.equals(NON_ROUTABLE_ADDRESS)) {

Review comment:
   Right. Makes sense. Fixed.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] aloknnikhil commented on a change in pull request #9985: KAFKA-12237: Support non-routable quorum voter addresses

2021-01-28 Thread GitBox


aloknnikhil commented on a change in pull request #9985:
URL: https://github.com/apache/kafka/pull/9985#discussion_r566394059



##
File path: raft/src/main/java/org/apache/kafka/raft/RaftConfig.java
##
@@ -99,7 +99,7 @@
 public final InetSocketAddress address;
 
 public InetAddressSpec(InetSocketAddress address) {
-if (address.equals(NON_ROUTABLE_ADDRESS)) {
+if (address != null && address.equals(NON_ROUTABLE_ADDRESS)) {

Review comment:
   Yea, I thought of that. But would there ever be a case where we'd need 
to construct the address spec with a null? Probably not. Might as well use a 
nullptr for the address spec at that point I guess. Fair enough, will change.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] aloknnikhil commented on a change in pull request #9985: KAFKA-12237: Support non-routable quorum voter addresses

2021-01-28 Thread GitBox


aloknnikhil commented on a change in pull request #9985:
URL: https://github.com/apache/kafka/pull/9985#discussion_r566379088



##
File path: raft/src/main/java/org/apache/kafka/raft/RaftConfig.java
##
@@ -89,8 +92,23 @@
 private final int appendLingerMs;
 private final Map voterConnections;
 
-public static abstract class AddressSpec {
-   public abstract InetSocketAddress address();
+public interface AddressSpec {
+}
+
+public static class InetAddressSpec implements AddressSpec {
+public final InetSocketAddress address;
+
+public InetAddressSpec(InetSocketAddress address) {
+if (address.equals(NON_ROUTABLE_ADDRESS)) {

Review comment:
   Good catch! Added





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] aloknnikhil commented on a change in pull request #9985: KAFKA-12237: Support non-routable quorum voter addresses

2021-01-28 Thread GitBox


aloknnikhil commented on a change in pull request #9985:
URL: https://github.com/apache/kafka/pull/9985#discussion_r566378930



##
File path: raft/src/main/java/org/apache/kafka/raft/RaftConfig.java
##
@@ -102,31 +120,23 @@ public boolean equals(Object obj) {
 return false;
 }
 
-final AddressSpec that = (AddressSpec) obj;
-return that.address().equals(address());
+final InetAddressSpec that = (InetAddressSpec) obj;
+return that.address.equals(address);
 }
 }
 
-public static class InetAddressSpec extends AddressSpec {
-private final InetSocketAddress address;
-
-public InetAddressSpec(InetSocketAddress address) {
-if (address.equals(UNROUTABLE_ADDRESS)) {
-throw new IllegalArgumentException("Address not routable");
-}
-this.address = address;
+public static class UnknownAddressSpec implements AddressSpec {
+private UnknownAddressSpec() {

Review comment:
   Yea, that's true actually. Should work fine for the map equals test case 
containing an instance of the UnknownAddressSpec. Removed





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] aloknnikhil commented on a change in pull request #9985: KAFKA-12237: Support non-routable quorum voter addresses

2021-01-28 Thread GitBox


aloknnikhil commented on a change in pull request #9985:
URL: https://github.com/apache/kafka/pull/9985#discussion_r566378383



##
File path: core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
##
@@ -1001,6 +1001,12 @@ class KafkaConfigTest {
 assertEquals(expectedVoters, raftConfig.quorumVoterConnections())
   }
 
+  @Test
+  def testNonRoutableAddressSpecException(): Unit = {
+assertThrows(classOf[IllegalArgumentException],
+  () => new InetAddressSpec(new InetSocketAddress("0.0.0.0", 0)))

Review comment:
   Yea, the only thing this is testing is that the `InetAddressSpec` never 
accepts a `0.0.0.0:0` as a parameter. We could remove it instead.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] aloknnikhil commented on a change in pull request #9985: KAFKA-12237: Support non-routable quorum voter addresses

2021-01-28 Thread GitBox


aloknnikhil commented on a change in pull request #9985:
URL: https://github.com/apache/kafka/pull/9985#discussion_r566377621



##
File path: core/src/main/scala/kafka/raft/RaftManager.scala
##
@@ -126,9 +126,13 @@ class KafkaRaftManager[T](
 case spec: InetAddressSpec => {
   netChannel.updateEndpoint(voterAddressEntry.getKey, spec)
 }
+case _: UnknownAddressSpec => {

Review comment:
   Ah makes sense. Removed.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] aloknnikhil commented on a change in pull request #9985: KAFKA-12237: Support non-routable quorum voter addresses

2021-01-28 Thread GitBox


aloknnikhil commented on a change in pull request #9985:
URL: https://github.com/apache/kafka/pull/9985#discussion_r566314131



##
File path: core/src/test/scala/unit/kafka/raft/KafkaNetworkChannelTest.scala
##
@@ -139,6 +145,25 @@ class KafkaNetworkChannelTest {
 }
   }
 
+  @Test
+  def testNonRoutableAddressUpdateRequest(): Unit = {
+val destinationId = 2
+assertThrows(classOf[IllegalArgumentException],
+  () => new InetAddressSpec(new InetSocketAddress("0.0.0.0", 0)))

Review comment:
   All the tests in `RaftConfigTest` were migrated to `KafkaConfigTest` in 
an earlier PR. I'll move this also.

##
File path: core/src/test/scala/unit/kafka/raft/KafkaNetworkChannelTest.scala
##
@@ -139,6 +145,25 @@ class KafkaNetworkChannelTest {
 }
   }
 
+  @Test
+  def testNonRoutableAddressUpdateRequest(): Unit = {
+val destinationId = 2
+assertThrows(classOf[IllegalArgumentException],
+  () => new InetAddressSpec(new InetSocketAddress("0.0.0.0", 0)))
+
+// Update channel with a valid endpoint

Review comment:
   Fair enough. Removed.

##
File path: core/src/main/scala/kafka/raft/RaftManager.scala
##
@@ -118,9 +119,20 @@ class KafkaRaftManager[T](
   private val raftIoThread = new RaftIoThread(raftClient)
 
   def startup(): Unit = {
+// Update the voter endpoints (if valid) with what's in RaftConfig
+val voterAddresses: util.Map[Integer, AddressSpec] = 
raftConfig.quorumVoterConnections
+for (voterAddressEntry <- voterAddresses.entrySet.asScala) {
+  voterAddressEntry.getValue match {
+case spec: InetAddressSpec => {
+  netChannel.updateEndpoint(voterAddressEntry.getKey, spec)
+}
+case invalid: AddressSpec => {
+  logger.warn(s"Skipping channel update for destination ID: 
${voterAddressEntry.getKey} " +

Review comment:
   Ack.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] aloknnikhil commented on a change in pull request #9985: KAFKA-12237: Support non-routable quorum voter addresses

2021-01-28 Thread GitBox


aloknnikhil commented on a change in pull request #9985:
URL: https://github.com/apache/kafka/pull/9985#discussion_r566313405



##
File path: raft/src/main/java/org/apache/kafka/raft/RaftConfig.java
##
@@ -76,7 +87,48 @@
 private final int electionBackoffMaxMs;
 private final int fetchTimeoutMs;
 private final int appendLingerMs;
-private final Map voterConnections;
+private final Map voterConnections;
+
+public static abstract class AddressSpec {
+   public abstract InetSocketAddress address();
+
+@Override
+public boolean equals(Object obj) {
+if (this == obj) {
+return true;
+}
+
+if (obj == null || getClass() != obj.getClass()) {
+return false;
+}
+
+final AddressSpec that = (AddressSpec) obj;
+return that.address().equals(address());
+}
+}
+
+public static class InetAddressSpec extends AddressSpec {
+private final InetSocketAddress address;
+
+public InetAddressSpec(InetSocketAddress address) {
+if (address.equals(UNROUTABLE_ADDRESS)) {
+throw new IllegalArgumentException("Address not routable");
+}
+this.address = address;
+}
+
+@Override
+public InetSocketAddress address() {
+return address;
+}
+}
+
+public static class UnknownAddressSpec extends AddressSpec {

Review comment:
   Ack. Refactored as stated above.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] aloknnikhil commented on a change in pull request #9985: KAFKA-12237: Support non-routable quorum voter addresses

2021-01-28 Thread GitBox


aloknnikhil commented on a change in pull request #9985:
URL: https://github.com/apache/kafka/pull/9985#discussion_r566312951



##
File path: raft/src/main/java/org/apache/kafka/raft/RaftConfig.java
##
@@ -76,7 +87,48 @@
 private final int electionBackoffMaxMs;
 private final int fetchTimeoutMs;
 private final int appendLingerMs;
-private final Map voterConnections;
+private final Map voterConnections;
+
+public static abstract class AddressSpec {
+   public abstract InetSocketAddress address();

Review comment:
   Yea, I did a second pass later and figured the address defeats the 
purpose of the type enforcement (especially returning an InetSocketAddress for 
any address spec). I reverted to an interface and have only 
InetSocketAddressSpec contain an InetSocketAddress





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] aloknnikhil commented on a change in pull request #9985: KAFKA-12237: Support non-routable quorum voter addresses

2021-01-27 Thread GitBox


aloknnikhil commented on a change in pull request #9985:
URL: https://github.com/apache/kafka/pull/9985#discussion_r565673883



##
File path: core/src/main/scala/kafka/raft/KafkaNetworkChannel.scala
##
@@ -34,6 +34,7 @@ import scala.collection.mutable
 
 object KafkaNetworkChannel {
 
+  val nonRoutableAddress = new InetSocketAddress("0.0.0.0", 0)

Review comment:
   Yea, good catch. The AddressSpec makes sense.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] aloknnikhil commented on a change in pull request #9985: KAFKA-12237: Support non-routable quorum voter addresses

2021-01-27 Thread GitBox


aloknnikhil commented on a change in pull request #9985:
URL: https://github.com/apache/kafka/pull/9985#discussion_r565673491



##
File path: raft/src/main/java/org/apache/kafka/raft/RaftConfig.java
##
@@ -36,7 +36,9 @@
 public static final String QUORUM_VOTERS_CONFIG = QUORUM_PREFIX + "voters";
 public static final String QUORUM_VOTERS_DOC = "Map of id/endpoint 
information for " +
 "the set of voters in a comma-separated list of `{id}@{host}:{port}` 
entries. " +
-"For example: `1@localhost:9092,2@localhost:9093,3@localhost:9094`";
+"For example: `1@localhost:9092,2@localhost:9093,3@localhost:9094.`" +
+"If the voter endpoints are not known at startup, a non-routable 
address can be provided instead." +

Review comment:
   Fair enough. I can move it there.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] aloknnikhil commented on a change in pull request #9985: KAFKA-12237: Support non-routable quorum voter addresses

2021-01-27 Thread GitBox


aloknnikhil commented on a change in pull request #9985:
URL: https://github.com/apache/kafka/pull/9985#discussion_r565673305



##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -208,8 +209,9 @@ public KafkaRaftClient(
 int fetchMaxWaitMs,
 OptionalInt nodeId,
 LogContext logContext,
-Random random
-) {
+Random random,
+RaftConfig raftConfig
+) throws IOException {

Review comment:
   Yea. This is an artifact from the `quorumState.initialize` change. Since 
that's moved down to the `client.initialize`, we can remove it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] aloknnikhil commented on a change in pull request #9985: KAFKA-12237: Support non-routable quorum voter addresses

2021-01-27 Thread GitBox


aloknnikhil commented on a change in pull request #9985:
URL: https://github.com/apache/kafka/pull/9985#discussion_r565672756



##
File path: raft/src/test/java/org/apache/kafka/raft/MockNetworkChannel.java
##
@@ -25,20 +25,25 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 
 public class MockNetworkChannel implements NetworkChannel {
 private final AtomicInteger correlationIdCounter;
+private final Map addressCache;

Review comment:
   Ack. I considered it. Figured it might be useful to have the endpoints 
for any future tests. Looks like the MockNetworkChannel doesn't test anything 
endpoint specific. Will remove





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org