This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1adb580  MINOR: KIP-631 KafkaConfig fixes and improvements (#10114)
1adb580 is described below

commit 1adb580faae89e0a298c0cb4ba08b238d91f9d03
Author: Colin Patrick McCabe <cmcc...@confluent.io>
AuthorDate: Thu Feb 11 21:35:24 2021 -0800

    MINOR: KIP-631 KafkaConfig fixes and improvements (#10114)
    
    Add the new KIP-631 configs to KafkaConfigTest to fix the test failure.
    
    Rename InitialBrokerRegistrationTimeoutMs to
    InitialBrokerRegistrationTimeoutMsProp for consistency with the other
    properties.
    
    Add ControllerListenerNamesProp as specified in KIP-631.
    
    Give nodeId and brokerId the same value in KafkaConfig.
    
    Reviewers: David Arthur <mum...@gmail.com
---
 core/src/main/scala/kafka/server/KafkaConfig.scala | 59 ++++++++++++------
 .../kafka/server/BrokerLifecycleManagerTest.scala  |  2 +-
 .../scala/unit/kafka/server/KafkaConfigTest.scala  | 71 ++++++++++++++++++++++
 3 files changed, 114 insertions(+), 18 deletions(-)

diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 0ecb48c..2fd04ae 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -372,11 +372,12 @@ object KafkaConfig {
 
   /** KIP-500 Configuration */
   val ProcessRolesProp = "process.roles"
-  val InitialBrokerRegistrationTimeoutMs = 
"initial.broker.registration.timeout.ms"
+  val InitialBrokerRegistrationTimeoutMsProp = 
"initial.broker.registration.timeout.ms"
   val BrokerHeartbeatIntervalMsProp = "broker.heartbeat.interval.ms"
   val BrokerSessionTimeoutMsProp = "broker.session.timeout.ms"
   val NodeIdProp = "node.id"
   val MetadataLogDirProp = "metadata.log.dir"
+  val ControllerListenerNamesProp = "controller.listener.names"
 
   /************* Authorizer Configuration ***********/
   val AuthorizerClassNameProp = "authorizer.class.name"
@@ -672,6 +673,8 @@ object KafkaConfig {
     "This is required configuration when the self-managed quorum is enabled."
   val MetadataLogDirDoc = "This configuration determines where we put the 
metadata log for clusters upgraded to " +
     "KIP-500. If it is not set, the metadata log is placed in the first log 
directory from log.dirs."
+  val ControllerListenerNamesDoc = "A comma-separated list of the names of the 
listeners used by the KIP-500 controller. This is required " +
+    "if this process is a KIP-500 controller. The ZK-based controller will not 
use this configuration."
 
   /************* Authorizer Configuration ***********/
   val AuthorizerClassNameDoc = s"The fully qualified name of a class that 
implements s${classOf[Authorizer].getName}" +
@@ -1073,10 +1076,11 @@ object KafkaConfig {
        */
       .defineInternal(ProcessRolesProp, LIST, Collections.emptyList(), 
ValidList.in("broker", "controller"), HIGH, ProcessRolesDoc)
       .defineInternal(NodeIdProp, INT, Defaults.EmptyNodeId, null, HIGH, 
NodeIdDoc)
-      .defineInternal(InitialBrokerRegistrationTimeoutMs, INT, 
Defaults.InitialBrokerRegistrationTimeoutMs, null, MEDIUM, 
InitialBrokerRegistrationTimeoutMsDoc)
+      .defineInternal(InitialBrokerRegistrationTimeoutMsProp, INT, 
Defaults.InitialBrokerRegistrationTimeoutMs, null, MEDIUM, 
InitialBrokerRegistrationTimeoutMsDoc)
       .defineInternal(BrokerHeartbeatIntervalMsProp, INT, 
Defaults.BrokerHeartbeatIntervalMs, null, MEDIUM, BrokerHeartbeatIntervalMsDoc)
       .defineInternal(BrokerSessionTimeoutMsProp, INT, 
Defaults.BrokerSessionTimeoutMs, null, MEDIUM, BrokerSessionTimeoutMsDoc)
       .defineInternal(MetadataLogDirProp, STRING, null, null, HIGH, 
MetadataLogDirDoc)
+      .defineInternal(ControllerListenerNamesProp, STRING, null, null, HIGH, 
ControllerListenerNamesDoc)
 
       /************* Authorizer Configuration ***********/
       .define(AuthorizerClassNameProp, STRING, Defaults.AuthorizerClassName, 
LOW, AuthorizerClassNameDoc)
@@ -1506,10 +1510,17 @@ class KafkaConfig(val props: java.util.Map[_, _], 
doLog: Boolean, dynamicConfigO
   /** ********* General Configuration ***********/
   val brokerIdGenerationEnable: Boolean = 
getBoolean(KafkaConfig.BrokerIdGenerationEnableProp)
   val maxReservedBrokerId: Int = getInt(KafkaConfig.MaxReservedBrokerIdProp)
-  var brokerId: Int = getInt(KafkaConfig.BrokerIdProp)
-  val nodeId: Int = getInt(KafkaConfig.NodeIdProp)
+  var brokerId: Int = {
+    val nodeId = getInt(KafkaConfig.NodeIdProp)
+    if (nodeId < 0) {
+      getInt(KafkaConfig.BrokerIdProp)
+    } else {
+      nodeId
+    }
+  }
+  val nodeId: Int = brokerId
   val processRoles: Set[ProcessRole] = parseProcessRoles()
-  val initialRegistrationTimeoutMs: Int = 
getInt(KafkaConfig.InitialBrokerRegistrationTimeoutMs)
+  val initialRegistrationTimeoutMs: Int = 
getInt(KafkaConfig.InitialBrokerRegistrationTimeoutMsProp)
   val brokerHeartbeatIntervalMs: Int = 
getInt(KafkaConfig.BrokerHeartbeatIntervalMsProp)
   val brokerSessionTimeoutMs: Int = 
getInt(KafkaConfig.BrokerSessionTimeoutMsProp)
 
@@ -1797,6 +1808,12 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: 
Boolean, dynamicConfigO
     }.getOrElse(CoreUtils.listenerListToEndPoints("PLAINTEXT://" + hostName + 
":" + port, listenerSecurityProtocolMap))
   }
 
+  def controllerListenerNames: Seq[String] =
+    
Option(getString(KafkaConfig.ControllerListenerNamesProp)).getOrElse("").split(",")
+
+  def controllerListeners: Seq[EndPoint] =
+    listeners.filter(l => 
controllerListenerNames.contains(l.listenerName.value()))
+
   def controlPlaneListener: Option[EndPoint] = {
     controlPlaneListenerName.map { listenerName =>
       listeners.filter(endpoint => endpoint.listenerName.value() == 
listenerName.value()).head
@@ -1804,9 +1821,10 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: 
Boolean, dynamicConfigO
   }
 
   def dataPlaneListeners: Seq[EndPoint] = {
-    Option(getString(KafkaConfig.ControlPlaneListenerNameProp)) match {
-      case Some(controlPlaneListenerName) => 
listeners.filterNot(_.listenerName.value() == controlPlaneListenerName)
-      case None => listeners
+    listeners.filterNot { listener =>
+      val name = listener.listenerName.value()
+      name.equals(getString(KafkaConfig.ControlPlaneListenerNameProp)) ||
+        controllerListenerNames.contains(name)
     }
   }
 
@@ -1820,7 +1838,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: 
Boolean, dynamicConfigO
     else if (getString(KafkaConfig.AdvertisedHostNameProp) != null || 
getInt(KafkaConfig.AdvertisedPortProp) != null)
       CoreUtils.listenerListToEndPoints("PLAINTEXT://" + advertisedHostName + 
":" + advertisedPort, listenerSecurityProtocolMap, requireDistinctPorts=false)
     else
-      listeners
+      listeners.filterNot(l => 
controllerListenerNames.contains(l.listenerName.value()))
   }
 
   private def getInterBrokerListenerNameAndSecurityProtocol: (ListenerName, 
SecurityProtocol) = {
@@ -1896,18 +1914,25 @@ class KafkaConfig(val props: java.util.Map[_, _], 
doLog: Boolean, dynamicConfigO
 
     val advertisedListenerNames = advertisedListeners.map(_.listenerName).toSet
     val listenerNames = listeners.map(_.listenerName).toSet
-    require(advertisedListenerNames.contains(interBrokerListenerName),
-      s"${KafkaConfig.InterBrokerListenerNameProp} must be a listener name 
defined in ${KafkaConfig.AdvertisedListenersProp}. " +
-      s"The valid options based on currently configured listeners are 
${advertisedListenerNames.map(_.value).mkString(",")}")
-    require(advertisedListenerNames.subsetOf(listenerNames),
-      s"${KafkaConfig.AdvertisedListenersProp} listener names must be equal to 
or a subset of the ones defined in ${KafkaConfig.ListenersProp}. " +
-      s"Found ${advertisedListenerNames.map(_.value).mkString(",")}. The valid 
options based on the current configuration " +
-      s"are ${listenerNames.map(_.value).mkString(",")}"
-    )
+    if (processRoles.isEmpty || processRoles.contains(BrokerRole)) {
+      require(advertisedListenerNames.contains(interBrokerListenerName),
+        s"${KafkaConfig.InterBrokerListenerNameProp} must be a listener name 
defined in ${KafkaConfig.AdvertisedListenersProp}. " +
+          s"The valid options based on currently configured listeners are 
${advertisedListenerNames.map(_.value).mkString(",")}")
+      require(advertisedListenerNames.subsetOf(listenerNames),
+        s"${KafkaConfig.AdvertisedListenersProp} listener names must be equal 
to or a subset of the ones defined in ${KafkaConfig.ListenersProp}. " +
+          s"Found ${advertisedListenerNames.map(_.value).mkString(",")}. The 
valid options based on the current configuration " +
+          s"are ${listenerNames.map(_.value).mkString(",")}"
+      )
+    }
+
     require(!advertisedListeners.exists(endpoint => endpoint.host=="0.0.0.0"),
       s"${KafkaConfig.AdvertisedListenersProp} cannot use the nonroutable 
meta-address 0.0.0.0. "+
       s"Use a routable IP address.")
 
+    // Ensure controller listeners are not in the advertised listeners list
+    require(!controllerListeners.exists(advertisedListeners.contains),
+      s"${KafkaConfig.AdvertisedListenersProp} cannot contain any of 
${KafkaConfig.ControllerListenerNamesProp}")
+
     // validate controller.listener.name config
     if (controlPlaneListenerName.isDefined) {
       require(advertisedListenerNames.contains(controlPlaneListenerName.get),
diff --git 
a/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
index a823ce6..7544a46 100644
--- a/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
@@ -46,7 +46,7 @@ class BrokerLifecycleManagerTest {
     properties.setProperty(KafkaConfig.LogDirsProp, "/tmp/foo")
     properties.setProperty(KafkaConfig.ProcessRolesProp, "broker")
     properties.setProperty(KafkaConfig.NodeIdProp, "1")
-    properties.setProperty(KafkaConfig.InitialBrokerRegistrationTimeoutMs, 
"300000")
+    properties.setProperty(KafkaConfig.InitialBrokerRegistrationTimeoutMsProp, 
"300000")
     properties
   }
 
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 427bc13..d6c456b 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -262,6 +262,30 @@ class KafkaConfigTest {
   }
 
   @Test
+  def testControllerListenerName() = {
+    val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
+    props.put(KafkaConfig.ListenersProp, 
"PLAINTEXT://localhost:0,CONTROLPLANE://localhost:4000,CONTROLLER://localhost:5000")
+    props.put(KafkaConfig.ListenerSecurityProtocolMapProp, 
"PLAINTEXT:PLAINTEXT,CONTROLPLANE:SSL,CONTROLLER:SASL_SSL")
+    props.put(KafkaConfig.AdvertisedListenersProp, 
"PLAINTEXT://localhost:0,CONTROLPLANE://localhost:4000")
+    props.put(KafkaConfig.ControlPlaneListenerNameProp, "CONTROLPLANE")
+    props.put(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER")
+    assertTrue(isValidKafkaConfig(props))
+
+    val serverConfig = KafkaConfig.fromProps(props)
+    val controlPlaneEndpoint = serverConfig.controlPlaneListener.get
+    assertEquals("localhost", controlPlaneEndpoint.host)
+    assertEquals(4000, controlPlaneEndpoint.port)
+    assertEquals(SecurityProtocol.SSL, controlPlaneEndpoint.securityProtocol)
+
+    val controllerEndpoints = serverConfig.controllerListeners
+    assertEquals(1, controllerEndpoints.size)
+    val controllerEndpoint = controllerEndpoints.iterator.next()
+    assertEquals("localhost", controllerEndpoint.host)
+    assertEquals(5000, controllerEndpoint.port)
+    assertEquals(SecurityProtocol.SASL_SSL, 
controllerEndpoint.securityProtocol)
+  }
+
+  @Test
   def testBadListenerProtocol(): Unit = {
     val props = new Properties()
     props.put(KafkaConfig.BrokerIdProp, "1")
@@ -619,8 +643,13 @@ class KafkaConfigTest {
         case KafkaConfig.ConnectionSetupTimeoutMaxMsProp => 
assertPropertyInvalid(baseProperties, name, "not_a_number")
 
           // KIP-500 Configurations
+        case KafkaConfig.ProcessRolesProp => // ignore
+        case KafkaConfig.InitialBrokerRegistrationTimeoutMsProp => 
assertPropertyInvalid(baseProperties, name, "not_a_number")
+        case KafkaConfig.BrokerHeartbeatIntervalMsProp => 
assertPropertyInvalid(baseProperties, name, "not_a_number")
+        case KafkaConfig.BrokerSessionTimeoutMsProp => 
assertPropertyInvalid(baseProperties, name, "not_a_number")
         case KafkaConfig.NodeIdProp => assertPropertyInvalid(baseProperties, 
name, "not_a_number")
         case KafkaConfig.MetadataLogDirProp => // ignore string
+        case KafkaConfig.ControllerListenerNamesProp => // ignore string
 
         case KafkaConfig.AuthorizerClassNameProp => //ignore string
         case KafkaConfig.CreateTopicPolicyClassNameProp => //ignore string
@@ -962,6 +991,48 @@ class KafkaConfigTest {
     }
   }
 
+  def assertDistinctControllerAndAdvertisedListeners(): Unit = {
+    val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port 
= TestUtils.MockZkPort)
+    val listeners = "PLAINTEXT://A:9092,SSL://B:9093,SASL_SSL://C:9094"
+    props.put(KafkaConfig.ListenersProp, listeners)
+    props.put(KafkaConfig.AdvertisedListenersProp, 
"PLAINTEXT://A:9092,SSL://B:9093")
+    // Valid now
+    assertTrue(isValidKafkaConfig(props))
+
+    // Still valid
+    val controllerListeners = "SASL_SSL"
+    props.put(KafkaConfig.ControllerListenerNamesProp, controllerListeners)
+    assertTrue(isValidKafkaConfig(props))
+  }
+
+  @Test
+  def assertAllControllerListenerCannotBeAdvertised(): Unit = {
+    val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port 
= TestUtils.MockZkPort)
+    val listeners = "PLAINTEXT://A:9092,SSL://B:9093,SASL_SSL://C:9094"
+    props.put(KafkaConfig.ListenersProp, listeners)
+    props.put(KafkaConfig.AdvertisedListenersProp, listeners)
+    // Valid now
+    assertTrue(isValidKafkaConfig(props))
+
+    // Invalid now
+    props.put(KafkaConfig.ControllerListenerNamesProp, 
"PLAINTEXT,SSL,SASL_SSL")
+    assertFalse(isValidKafkaConfig(props))
+  }
+
+  @Test
+  def assertEvenOneControllerListenerCannotBeAdvertised(): Unit = {
+    val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port 
= TestUtils.MockZkPort)
+    val listeners = "PLAINTEXT://A:9092,SSL://B:9093,SASL_SSL://C:9094"
+    props.put(KafkaConfig.ListenersProp, listeners)
+    props.put(KafkaConfig.AdvertisedListenersProp, listeners)
+    // Valid now
+    assertTrue(isValidKafkaConfig(props))
+
+    // Invalid now
+    props.put(KafkaConfig.ControllerListenerNamesProp, "PLAINTEXT")
+    assertFalse(isValidKafkaConfig(props))
+  }
+
   @Test
   def testInvalidQuorumVotersConfig(): Unit = {
     assertInvalidQuorumVoters("1")

Reply via email to