showuon commented on a change in pull request #11179:
URL: https://github.com/apache/kafka/pull/11179#discussion_r684631273



##########
File path: core/src/test/scala/unit/kafka/KafkaConfigTest.scala
##########
@@ -79,6 +81,71 @@ class KafkaTest {
     assertThrows(classOf[FatalExitError], () => 
KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile, 
"broker.id=1", "--override", "broker.id=2"))))
   }
 
+  @Test
+  def testBrokerRoleNodeIdValidation(): Unit = {
+    // Ensure that validation is happening at startup to check that brokers do 
not use their node.id as a voter in controller.quorum.voters 
+    val propertiesFile = new Properties
+    propertiesFile.setProperty(KafkaConfig.ProcessRolesProp, "broker")
+    propertiesFile.setProperty(KafkaConfig.NodeIdProp, "1")
+    propertiesFile.setProperty(KafkaConfig.QuorumVotersProp, 
"1@localhost:9092")
+    setListenerProps(propertiesFile)
+    assertThrows(classOf[IllegalArgumentException], () => 
KafkaConfig.fromProps(propertiesFile))
+
+    // Ensure that with a valid config no exception is thrown
+    propertiesFile.setProperty(KafkaConfig.NodeIdProp, "2")
+    KafkaConfig.fromProps(propertiesFile)
+  }
+
+  @Test
+  def testControllerRoleNodeIdValidation(): Unit = {
+    // Ensure that validation is happening at startup to check that 
controllers use their node.id as a voter in controller.quorum.voters 
+    val propertiesFile = new Properties
+    propertiesFile.setProperty(KafkaConfig.ProcessRolesProp, "controller")
+    propertiesFile.setProperty(KafkaConfig.NodeIdProp, "1")
+    propertiesFile.setProperty(KafkaConfig.QuorumVotersProp, 
"2@localhost:9092")
+    setListenerProps(propertiesFile)
+    assertThrows(classOf[IllegalArgumentException], () => 
KafkaConfig.fromProps(propertiesFile))
+
+    // Ensure that with a valid config no exception is thrown
+    propertiesFile.setProperty(KafkaConfig.NodeIdProp, "2")
+    KafkaConfig.fromProps(propertiesFile)
+  }
+
+  @Test
+  def testColocatedRoleNodeIdValidation(): Unit = {
+    // Ensure that validation is happening at startup to check that colocated 
processes use their node.id as a voter in controller.quorum.voters 
+    val propertiesFile = new Properties
+    propertiesFile.setProperty(KafkaConfig.ProcessRolesProp, 
"controller,broker")
+    propertiesFile.setProperty(KafkaConfig.NodeIdProp, "1")
+    propertiesFile.setProperty(KafkaConfig.QuorumVotersProp, 
"2@localhost:9092")
+    setListenerProps(propertiesFile)
+    assertThrows(classOf[IllegalArgumentException], () => 
KafkaConfig.fromProps(propertiesFile))
+
+    // Ensure that with a valid config no exception is thrown
+    propertiesFile.setProperty(KafkaConfig.NodeIdProp, "2")
+    KafkaConfig.fromProps(propertiesFile)
+  }
+
+  @Test
+  def testMustContainQuorumVotersIfUsingProcessRoles(): Unit = {
+    // Ensure that validation is happening at startup to check that colocated 
processes use their node.id as a voter in controller.quorum.voters 

Review comment:
       I think this comment doesn't explain what we're testing in this test. 
Maybe update to: "... to check that if processRoles property is set, the 
`controller.quorum.voters` should not be empty"

##########
File path: core/src/test/scala/unit/kafka/KafkaConfigTest.scala
##########
@@ -79,6 +81,71 @@ class KafkaTest {
     assertThrows(classOf[FatalExitError], () => 
KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile, 
"broker.id=1", "--override", "broker.id=2"))))
   }
 
+  @Test
+  def testBrokerRoleNodeIdValidation(): Unit = {
+    // Ensure that validation is happening at startup to check that brokers do 
not use their node.id as a voter in controller.quorum.voters 
+    val propertiesFile = new Properties
+    propertiesFile.setProperty(KafkaConfig.ProcessRolesProp, "broker")
+    propertiesFile.setProperty(KafkaConfig.NodeIdProp, "1")
+    propertiesFile.setProperty(KafkaConfig.QuorumVotersProp, 
"1@localhost:9092")
+    setListenerProps(propertiesFile)
+    assertThrows(classOf[IllegalArgumentException], () => 
KafkaConfig.fromProps(propertiesFile))
+
+    // Ensure that with a valid config no exception is thrown
+    propertiesFile.setProperty(KafkaConfig.NodeIdProp, "2")
+    KafkaConfig.fromProps(propertiesFile)
+  }
+
+  @Test
+  def testControllerRoleNodeIdValidation(): Unit = {
+    // Ensure that validation is happening at startup to check that 
controllers use their node.id as a voter in controller.quorum.voters 
+    val propertiesFile = new Properties
+    propertiesFile.setProperty(KafkaConfig.ProcessRolesProp, "controller")
+    propertiesFile.setProperty(KafkaConfig.NodeIdProp, "1")
+    propertiesFile.setProperty(KafkaConfig.QuorumVotersProp, 
"2@localhost:9092")
+    setListenerProps(propertiesFile)
+    assertThrows(classOf[IllegalArgumentException], () => 
KafkaConfig.fromProps(propertiesFile))
+
+    // Ensure that with a valid config no exception is thrown
+    propertiesFile.setProperty(KafkaConfig.NodeIdProp, "2")
+    KafkaConfig.fromProps(propertiesFile)
+  }
+
+  @Test
+  def testColocatedRoleNodeIdValidation(): Unit = {
+    // Ensure that validation is happening at startup to check that colocated 
processes use their node.id as a voter in controller.quorum.voters 
+    val propertiesFile = new Properties
+    propertiesFile.setProperty(KafkaConfig.ProcessRolesProp, 
"controller,broker")
+    propertiesFile.setProperty(KafkaConfig.NodeIdProp, "1")
+    propertiesFile.setProperty(KafkaConfig.QuorumVotersProp, 
"2@localhost:9092")
+    setListenerProps(propertiesFile)
+    assertThrows(classOf[IllegalArgumentException], () => 
KafkaConfig.fromProps(propertiesFile))
+
+    // Ensure that with a valid config no exception is thrown
+    propertiesFile.setProperty(KafkaConfig.NodeIdProp, "2")
+    KafkaConfig.fromProps(propertiesFile)
+  }
+
+  @Test
+  def testMustContainQuorumVotersIfUsingProcessRoles(): Unit = {
+    // Ensure that validation is happening at startup to check that colocated 
processes use their node.id as a voter in controller.quorum.voters 
+    val propertiesFile = new Properties
+    propertiesFile.setProperty(KafkaConfig.ProcessRolesProp, 
"controller,broker")
+    propertiesFile.setProperty(KafkaConfig.NodeIdProp, "1")
+    propertiesFile.setProperty(KafkaConfig.QuorumVotersProp, "")
+    setListenerProps(propertiesFile)
+    assertThrows(classOf[ConfigException], () => 
KafkaConfig.fromProps(propertiesFile))
+  }

Review comment:
       nit: Maybe we can test after this `assertThrows` that when we remove the 
`ProcessRolesProp`, no exceptions will be thrown. ex:
   ```java
   assertThrows(classOf[ConfigException], () => 
KafkaConfig.fromProps(propertiesFile))
   
       // Ensure that if removing the ProcessRolesProp value, no exception is 
thrown
       propertiesFile.setProperty((KafkaConfig.ProcessRolesProp, "")
       KafkaConfig.fromProps(propertiesFile)
   ```
   What do you think?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to