This is an automated email from the ASF dual-hosted git repository.

jsancio pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 10873e42101 KAFKA-18281: Kafka is improperly validating non-advertised 
listeners for routable controller addresses (#18387)
10873e42101 is described below

commit 10873e42101671e3e917f7d20bdd99870a700564
Author: PoAn Yang <[email protected]>
AuthorDate: Tue Feb 25 10:51:28 2025 +0800

    KAFKA-18281: Kafka is improperly validating non-advertised listeners for 
routable controller addresses (#18387)
    
    When a cluster is configured with a dynamic controller quorum, KRaft 
replica's endpoint are computed using the advertised.listeners property and not 
the quorum.controller.voters property. This change in the configuration makes 
it difficult to keeping all previous node configurations compatible with the 
new endpoint discovery functionality.
    
    The least intrusive solution is to rely on Kafka's reverse hostname lookup 
when the hostname is not specified. The effective advertised controller 
listener now remove '0.0.0.0' hostname if the endpoint came from the listener 
configuration and not the advertised.listener configuration.
    
    Reviewers: José Armando García Sancio <[email protected]>, Alyssa Huang 
<[email protected]>
---
 core/src/main/scala/kafka/server/KafkaConfig.scala | 33 +++++++---
 .../scala/unit/kafka/server/KafkaConfigTest.scala  | 76 ++++++++++++++++++++++
 2 files changed, 100 insertions(+), 9 deletions(-)

diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 17b8cefb1ee..db5b84b58ae 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -519,29 +519,44 @@ class KafkaConfig private(doLog: Boolean, val props: 
util.Map[_, _])
   }
 
   def effectiveAdvertisedControllerListeners: Seq[EndPoint] = {
-    val controllerAdvertisedListeners = advertisedListeners.filter(l => 
controllerListenerNames.contains(l.listenerName.value()))
+    val advertisedListenersProp = 
getString(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG)
+    val controllerAdvertisedListeners = if (advertisedListenersProp != null) {
+      CoreUtils.listenerListToEndPoints(advertisedListenersProp, 
effectiveListenerSecurityProtocolMap, requireDistinctPorts=false)
+        .filter(l => controllerListenerNames.contains(l.listenerName.value()))
+    } else {
+      Seq.empty
+    }
     val controllerListenersValue = controllerListeners
 
     controllerListenerNames.flatMap { name =>
       controllerAdvertisedListeners
         .find(endpoint => 
endpoint.listenerName.equals(ListenerName.normalised(name)))
-        .orElse(controllerListenersValue.find(endpoint => 
endpoint.listenerName.equals(ListenerName.normalised(name))))
+        .orElse(
+          // If users don't define advertised.listeners, the advertised 
controller listeners inherit from listeners configuration
+          // which match listener names in controller.listener.names.
+          // Removing "0.0.0.0" host to avoid validation errors. This is to be 
compatible with the old behavior before 3.9.
+          // The null or "" host does a reverse lookup in 
ListenerInfo#withWildcardHostnamesResolved.
+          controllerListenersValue
+            .find(endpoint => 
endpoint.listenerName.equals(ListenerName.normalised(name)))
+            .map(endpoint => if (endpoint.host == "0.0.0.0") {
+              new EndPoint(null, endpoint.port, endpoint.listenerName, 
endpoint.securityProtocol)
+            } else {
+              endpoint
+            })
+        )
     }
   }
 
   def effectiveAdvertisedBrokerListeners: Seq[EndPoint] = {
-    // Only expose broker listeners
-    advertisedListeners.filterNot(l => 
controllerListenerNames.contains(l.listenerName.value()))
-  }
-
-  // Use advertised listeners if defined, fallback to listeners otherwise
-  private def advertisedListeners: Seq[EndPoint] = {
+    // Use advertised listeners if defined, fallback to listeners otherwise
     val advertisedListenersProp = 
getString(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG)
-    if (advertisedListenersProp != null) {
+    val advertisedListeners = if (advertisedListenersProp != null) {
       CoreUtils.listenerListToEndPoints(advertisedListenersProp, 
effectiveListenerSecurityProtocolMap, requireDistinctPorts=false)
     } else {
       listeners
     }
+    // Only expose broker listeners
+    advertisedListeners.filterNot(l => 
controllerListenerNames.contains(l.listenerName.value()))
   }
 
   private def getInterBrokerListenerNameAndSecurityProtocol: (ListenerName, 
SecurityProtocol) = {
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 5c95f409846..70111b5fde8 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -1230,6 +1230,82 @@ class KafkaConfigTest {
     KafkaConfig.fromProps(props)
   }
 
+  @Test
+  def testImplicitAllBindingListenersCanBeAdvertisedForBroker(): Unit = {
+    val props = new Properties()
+    props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker")
+    val listeners = "PLAINTEXT://:9092"
+    props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, listeners)
+    props.setProperty(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, 
listeners) // explicitly setting it in broker
+    props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, 
"CONTROLLER")
+    props.setProperty(KRaftConfigs.NODE_ID_CONFIG, "2")
+    props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, "1@localhost:9093")
+
+    // Valid
+    KafkaConfig.fromProps(props)
+
+    // Also valid if we allow advertised listeners to derive from listeners
+    props.remove(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG)
+    KafkaConfig.fromProps(props)
+  }
+
+  @Test
+  def testExplicitAllBindingListenersCannotBeUsedForBroker(): Unit = {
+    val props = new Properties()
+    props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker")
+    val listeners = "PLAINTEXT://0.0.0.0:9092"
+    props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, listeners)
+    props.setProperty(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, 
listeners) // explicitly setting it in KRaft
+    props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, 
"CONTROLLER")
+    props.setProperty(KRaftConfigs.NODE_ID_CONFIG, "2")
+    props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, "1@localhost:9093")
+
+    val expectedExceptionContainsText = "advertised.listeners cannot use the 
nonroutable meta-address 0.0.0.0. Use a routable IP address."
+    assertBadConfigContainingMessage(props, expectedExceptionContainsText)
+
+    // invalid if we allow advertised listeners to derive from listeners
+    props.remove(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG)
+    assertBadConfigContainingMessage(props, expectedExceptionContainsText)
+  }
+
+  @Test
+  def 
testImplicitAllBindingControllerListenersCanBeAdvertisedForKRaftController(): 
Unit = {
+    val props = new Properties()
+    props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "controller")
+    val listeners = "CONTROLLER://:9093"
+    props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, listeners)
+    props.setProperty(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, 
listeners) // explicitly setting it in KRaft
+    props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, 
"CONTROLLER")
+    props.setProperty(KRaftConfigs.NODE_ID_CONFIG, "2")
+    props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, "2@localhost:9093")
+
+    // Valid
+    KafkaConfig.fromProps(props)
+
+    // Also valid if we allow advertised listeners to derive from 
listeners/controller.listener.names
+    props.remove(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG)
+    KafkaConfig.fromProps(props)
+  }
+
+  @Test
+  def 
testExplicitAllBindingControllerListenersCanBeAdvertisedForKRaftController(): 
Unit = {
+    val props = new Properties()
+    props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "controller")
+    val listeners = "CONTROLLER://0.0.0.0:9093"
+    props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, listeners)
+    props.setProperty(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, 
listeners) // explicitly setting it in KRaft
+    props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, 
"CONTROLLER")
+    props.setProperty(KRaftConfigs.NODE_ID_CONFIG, "2")
+    props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, "2@localhost:9093")
+
+    val expectedExceptionContainsText = "advertised.listeners cannot use the 
nonroutable meta-address 0.0.0.0. Use a routable IP address."
+    assertBadConfigContainingMessage(props, expectedExceptionContainsText)
+
+    // Valid if we allow advertised listeners to derive from 
listeners/controller.listener.names
+    props.remove(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG)
+    KafkaConfig.fromProps(props)
+  }
+
   @Test
   def testControllerListenersCanBeAdvertisedForKRaftCombined(): Unit = {
     val props = new Properties()

Reply via email to