dielhennr commented on a change in pull request #11179: URL: https://github.com/apache/kafka/pull/11179#discussion_r684692820
########## File path: core/src/test/scala/unit/kafka/KafkaConfigTest.scala ########## @@ -79,6 +81,71 @@ class KafkaTest { assertThrows(classOf[FatalExitError], () => KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile, "broker.id=1", "--override", "broker.id=2")))) } + @Test + def testBrokerRoleNodeIdValidation(): Unit = { + // Ensure that validation is happening at startup to check that brokers do not use their node.id as a voter in controller.quorum.voters + val propertiesFile = new Properties + propertiesFile.setProperty(KafkaConfig.ProcessRolesProp, "broker") + propertiesFile.setProperty(KafkaConfig.NodeIdProp, "1") + propertiesFile.setProperty(KafkaConfig.QuorumVotersProp, "1@localhost:9092") + setListenerProps(propertiesFile) + assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(propertiesFile)) + + // Ensure that with a valid config no exception is thrown + propertiesFile.setProperty(KafkaConfig.NodeIdProp, "2") + KafkaConfig.fromProps(propertiesFile) + } + + @Test + def testControllerRoleNodeIdValidation(): Unit = { + // Ensure that validation is happening at startup to check that controllers use their node.id as a voter in controller.quorum.voters + val propertiesFile = new Properties + propertiesFile.setProperty(KafkaConfig.ProcessRolesProp, "controller") + propertiesFile.setProperty(KafkaConfig.NodeIdProp, "1") + propertiesFile.setProperty(KafkaConfig.QuorumVotersProp, "2@localhost:9092") + setListenerProps(propertiesFile) + assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(propertiesFile)) + + // Ensure that with a valid config no exception is thrown + propertiesFile.setProperty(KafkaConfig.NodeIdProp, "2") + KafkaConfig.fromProps(propertiesFile) + } + + @Test + def testColocatedRoleNodeIdValidation(): Unit = { + // Ensure that validation is happening at startup to check that colocated processes use their node.id as a voter in controller.quorum.voters + val propertiesFile = new Properties + propertiesFile.setProperty(KafkaConfig.ProcessRolesProp, "controller,broker") + propertiesFile.setProperty(KafkaConfig.NodeIdProp, "1") + propertiesFile.setProperty(KafkaConfig.QuorumVotersProp, "2@localhost:9092") + setListenerProps(propertiesFile) + assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(propertiesFile)) + + // Ensure that with a valid config no exception is thrown + propertiesFile.setProperty(KafkaConfig.NodeIdProp, "2") + KafkaConfig.fromProps(propertiesFile) + } + + @Test + def testMustContainQuorumVotersIfUsingProcessRoles(): Unit = { + // Ensure that validation is happening at startup to check that colocated processes use their node.id as a voter in controller.quorum.voters + val propertiesFile = new Properties + propertiesFile.setProperty(KafkaConfig.ProcessRolesProp, "controller,broker") + propertiesFile.setProperty(KafkaConfig.NodeIdProp, "1") + propertiesFile.setProperty(KafkaConfig.QuorumVotersProp, "") + setListenerProps(propertiesFile) + assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(propertiesFile)) + } Review comment: ``` // Ensure that if neither process.roles nor controller.quorum.voters is populated, then an exception is thrown if zookeeper.connect is not defined propertiesFile.setProperty(KafkaConfig.ProcessRolesProp, "") assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(propertiesFile)) // Ensure that no exception is thrown once zookeeper.connect is defined propertiesFile.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181") KafkaConfig.fromProps(propertiesFile) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org