dielhennr commented on a change in pull request #11179:
URL: https://github.com/apache/kafka/pull/11179#discussion_r684692820



##########
File path: core/src/test/scala/unit/kafka/KafkaConfigTest.scala
##########
@@ -79,6 +81,71 @@ class KafkaTest {
     assertThrows(classOf[FatalExitError], () => 
KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile, 
"broker.id=1", "--override", "broker.id=2"))))
   }
 
+  @Test
+  def testBrokerRoleNodeIdValidation(): Unit = {
+    // Ensure that validation is happening at startup to check that brokers do 
not use their node.id as a voter in controller.quorum.voters 
+    val propertiesFile = new Properties
+    propertiesFile.setProperty(KafkaConfig.ProcessRolesProp, "broker")
+    propertiesFile.setProperty(KafkaConfig.NodeIdProp, "1")
+    propertiesFile.setProperty(KafkaConfig.QuorumVotersProp, 
"1@localhost:9092")
+    setListenerProps(propertiesFile)
+    assertThrows(classOf[IllegalArgumentException], () => 
KafkaConfig.fromProps(propertiesFile))
+
+    // Ensure that with a valid config no exception is thrown
+    propertiesFile.setProperty(KafkaConfig.NodeIdProp, "2")
+    KafkaConfig.fromProps(propertiesFile)
+  }
+
+  @Test
+  def testControllerRoleNodeIdValidation(): Unit = {
+    // Ensure that validation is happening at startup to check that 
controllers use their node.id as a voter in controller.quorum.voters 
+    val propertiesFile = new Properties
+    propertiesFile.setProperty(KafkaConfig.ProcessRolesProp, "controller")
+    propertiesFile.setProperty(KafkaConfig.NodeIdProp, "1")
+    propertiesFile.setProperty(KafkaConfig.QuorumVotersProp, 
"2@localhost:9092")
+    setListenerProps(propertiesFile)
+    assertThrows(classOf[IllegalArgumentException], () => 
KafkaConfig.fromProps(propertiesFile))
+
+    // Ensure that with a valid config no exception is thrown
+    propertiesFile.setProperty(KafkaConfig.NodeIdProp, "2")
+    KafkaConfig.fromProps(propertiesFile)
+  }
+
+  @Test
+  def testColocatedRoleNodeIdValidation(): Unit = {
+    // Ensure that validation is happening at startup to check that colocated 
processes use their node.id as a voter in controller.quorum.voters 
+    val propertiesFile = new Properties
+    propertiesFile.setProperty(KafkaConfig.ProcessRolesProp, 
"controller,broker")
+    propertiesFile.setProperty(KafkaConfig.NodeIdProp, "1")
+    propertiesFile.setProperty(KafkaConfig.QuorumVotersProp, 
"2@localhost:9092")
+    setListenerProps(propertiesFile)
+    assertThrows(classOf[IllegalArgumentException], () => 
KafkaConfig.fromProps(propertiesFile))
+
+    // Ensure that with a valid config no exception is thrown
+    propertiesFile.setProperty(KafkaConfig.NodeIdProp, "2")
+    KafkaConfig.fromProps(propertiesFile)
+  }
+
+  @Test
+  def testMustContainQuorumVotersIfUsingProcessRoles(): Unit = {
+    // Ensure that validation is happening at startup to check that colocated 
processes use their node.id as a voter in controller.quorum.voters 
+    val propertiesFile = new Properties
+    propertiesFile.setProperty(KafkaConfig.ProcessRolesProp, 
"controller,broker")
+    propertiesFile.setProperty(KafkaConfig.NodeIdProp, "1")
+    propertiesFile.setProperty(KafkaConfig.QuorumVotersProp, "")
+    setListenerProps(propertiesFile)
+    assertThrows(classOf[ConfigException], () => 
KafkaConfig.fromProps(propertiesFile))
+  }

Review comment:
       ```
        // Ensure that if neither process.roles nor controller.quorum.voters is 
populated, then an exception is thrown if zookeeper.connect is not defined
        propertiesFile.setProperty(KafkaConfig.ProcessRolesProp, "")
        assertThrows(classOf[ConfigException], () => 
KafkaConfig.fromProps(propertiesFile))
   
        // Ensure that no exception is thrown once zookeeper.connect is defined
        propertiesFile.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181")
        KafkaConfig.fromProps(propertiesFile)
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to