Re: [PR] KAFKA-16539 Fix IncrementalAlterConfigs during ZK migration [kafka]

2024-05-06 Thread via GitHub


chia7712 merged PR #15744:
URL: https://github.com/apache/kafka/pull/15744


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16539 Fix IncrementalAlterConfigs during ZK migration [kafka]

2024-05-06 Thread via GitHub


mumrah commented on PR #15744:
URL: https://github.com/apache/kafka/pull/15744#issuecomment-2096872249

   [Latest test 
run](https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-15744/13/tests)
 looks pretty reasonable. The failures all look unrelated 
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16539 Fix IncrementalAlterConfigs during ZK migration [kafka]

2024-05-06 Thread via GitHub


mumrah commented on PR #15744:
URL: https://github.com/apache/kafka/pull/15744#issuecomment-2096415182

   Thanks for continued reviews @chia7712 ,  I've addressed your latest 
feedback.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16539 Fix IncrementalAlterConfigs during ZK migration [kafka]

2024-05-06 Thread via GitHub


mumrah commented on code in PR #15744:
URL: https://github.com/apache/kafka/pull/15744#discussion_r1591129328


##
core/src/main/scala/kafka/zk/KafkaZkClient.scala:
##
@@ -467,13 +474,48 @@ class KafkaZkClient private[zk] (zooKeeperClient: 
ZooKeeperClient, isSecure: Boo
* @param rootEntityType entity type
* @param sanitizedEntityName entity name
* @throws KeeperException if there is an error while setting or creating 
the znode
+   * @throws ControllerMovedException if no controller is defined, or a KRaft 
controller is defined
*/
   def setOrCreateEntityConfigs(rootEntityType: String, sanitizedEntityName: 
String, config: Properties): Unit = {
+val controllerZkVersionOpt: Option[Int] = if 
(!enableEntityConfigNoController) {

Review Comment:
   The name "enableEntityConfigNoController" is meant to convey "Enable setting 
entity configs even when there is no controller". But even as I've been coding 
this i've mixed up the meaning more than once  
   
   I'll go with `enableEntityConfigControllerCheck`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16539 Fix IncrementalAlterConfigs during ZK migration [kafka]

2024-05-06 Thread via GitHub


mumrah commented on code in PR #15744:
URL: https://github.com/apache/kafka/pull/15744#discussion_r1591129328


##
core/src/main/scala/kafka/zk/KafkaZkClient.scala:
##
@@ -467,13 +474,48 @@ class KafkaZkClient private[zk] (zooKeeperClient: 
ZooKeeperClient, isSecure: Boo
* @param rootEntityType entity type
* @param sanitizedEntityName entity name
* @throws KeeperException if there is an error while setting or creating 
the znode
+   * @throws ControllerMovedException if no controller is defined, or a KRaft 
controller is defined
*/
   def setOrCreateEntityConfigs(rootEntityType: String, sanitizedEntityName: 
String, config: Properties): Unit = {
+val controllerZkVersionOpt: Option[Int] = if 
(!enableEntityConfigNoController) {

Review Comment:
   The name "enableEntityConfigNoController" is meant to convey "Enable setting 
entity configs even when there is no controller". But even as I've been coding 
this i've mixed up the meaning more than once  
   
   How about: `requireEntityConfigControllerCheck`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16539 Fix IncrementalAlterConfigs during ZK migration [kafka]

2024-05-06 Thread via GitHub


mumrah commented on code in PR #15744:
URL: https://github.com/apache/kafka/pull/15744#discussion_r1591110984


##
core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala:
##
@@ -1037,24 +1104,35 @@ class ZkMigrationIntegrationTest {
 admin.alterUserScramCredentials(alterations)
   }
 
-  def verifyTopicConfigs(zkClient: KafkaZkClient): Unit = {
-TestUtils.retry(1) {
+  def verifyTopicConfigs(zkClient: KafkaZkClient, shouldRetry: Boolean): Unit 
= {
+maybeRetry(shouldRetry, 1) {
   val propsAfter = zkClient.getEntityConfigs(ConfigType.TOPIC, "test")
   assertEquals("204800", 
propsAfter.getProperty(TopicConfig.SEGMENT_BYTES_CONFIG))
   assertFalse(propsAfter.containsKey(TopicConfig.SEGMENT_MS_CONFIG))
 }
   }
 
-  def verifyClientQuotas(zkClient: KafkaZkClient): Unit = {
-TestUtils.retry(1) {
-  assertEquals("1000", zkClient.getEntityConfigs(ConfigType.USER, 
Sanitizer.sanitize("user@1")).getProperty("consumer_byte_rate"))
-  assertEquals("900", zkClient.getEntityConfigs(ConfigType.USER, 
"").getProperty("consumer_byte_rate"))
-  assertEquals("800", zkClient.getEntityConfigs("users/" + 
Sanitizer.sanitize("user@1") + "/clients", 
"clientA").getProperty("consumer_byte_rate"))
-  assertEquals("100", zkClient.getEntityConfigs("users/" + 
Sanitizer.sanitize("user@1") + "/clients", 
"clientA").getProperty("producer_byte_rate"))
-  assertEquals("10", zkClient.getEntityConfigs(ConfigType.IP, 
"8.8.8.8").getProperty("connection_creation_rate"))
+  def verifyBrokerConfigs(zkClient: KafkaZkClient, shouldRetry: Boolean): Unit 
= {
+maybeRetry(shouldRetry, 1) {
+  val defaultBrokerProps = zkClient.getEntityConfigs(ConfigType.BROKER, 
"")
+  assertEquals("8640", 
defaultBrokerProps.getProperty(ServerLogConfigs.LOG_RETENTION_TIME_MILLIS_CONFIG))
+
+  val broker0Props = zkClient.getEntityConfigs(ConfigType.BROKER, "0")
+  assertEquals("4320", 
broker0Props.getProperty(ServerLogConfigs.LOG_RETENTION_TIME_MILLIS_CONFIG))
+
+  val broker1Props = zkClient.getEntityConfigs(ConfigType.BROKER, "1")
+  assertEquals("4320", 
broker1Props.getProperty(ServerLogConfigs.LOG_RETENTION_TIME_MILLIS_CONFIG))
 }
   }
 
+  def verifyClientQuotas(zkClient: KafkaZkClient): Unit = {

Review Comment:
   Ah good catch! I did not mean to change this logic. Will fix.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16539 Fix IncrementalAlterConfigs during ZK migration [kafka]

2024-05-05 Thread via GitHub


chia7712 commented on code in PR #15744:
URL: https://github.com/apache/kafka/pull/15744#discussion_r1590547523


##
core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala:
##
@@ -1073,4 +1151,13 @@ class ZkMigrationIntegrationTest {
 kraftCluster.close()
 zkCluster.stop()
   }
+
+  def maybeRetry(shouldRetry: Boolean, maxWaitMs: Long)(block: => Unit): Unit 
= {

Review Comment:
   It seems we don't set `shouldRetry` to `false` in this test. Maybe this 
method is unnecessary?



##
core/src/main/scala/kafka/zk/KafkaZkClient.scala:
##
@@ -467,13 +474,48 @@ class KafkaZkClient private[zk] (zooKeeperClient: 
ZooKeeperClient, isSecure: Boo
* @param rootEntityType entity type
* @param sanitizedEntityName entity name
* @throws KeeperException if there is an error while setting or creating 
the znode
+   * @throws ControllerMovedException if no controller is defined, or a KRaft 
controller is defined
*/
   def setOrCreateEntityConfigs(rootEntityType: String, sanitizedEntityName: 
String, config: Properties): Unit = {
+val controllerZkVersionOpt: Option[Int] = if 
(!enableEntityConfigNoController) {

Review Comment:
   As this flag is used to guard against 1) no controller and 2) kraft 
controller, the naming enableEntityConfig`NoController` is a bit unsuitable. 
How about `enableEntityConfigCheck`?



##
core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala:
##
@@ -1037,24 +1104,35 @@ class ZkMigrationIntegrationTest {
 admin.alterUserScramCredentials(alterations)
   }
 
-  def verifyTopicConfigs(zkClient: KafkaZkClient): Unit = {
-TestUtils.retry(1) {
+  def verifyTopicConfigs(zkClient: KafkaZkClient, shouldRetry: Boolean): Unit 
= {
+maybeRetry(shouldRetry, 1) {
   val propsAfter = zkClient.getEntityConfigs(ConfigType.TOPIC, "test")
   assertEquals("204800", 
propsAfter.getProperty(TopicConfig.SEGMENT_BYTES_CONFIG))
   assertFalse(propsAfter.containsKey(TopicConfig.SEGMENT_MS_CONFIG))
 }
   }
 
-  def verifyClientQuotas(zkClient: KafkaZkClient): Unit = {
-TestUtils.retry(1) {
-  assertEquals("1000", zkClient.getEntityConfigs(ConfigType.USER, 
Sanitizer.sanitize("user@1")).getProperty("consumer_byte_rate"))
-  assertEquals("900", zkClient.getEntityConfigs(ConfigType.USER, 
"").getProperty("consumer_byte_rate"))
-  assertEquals("800", zkClient.getEntityConfigs("users/" + 
Sanitizer.sanitize("user@1") + "/clients", 
"clientA").getProperty("consumer_byte_rate"))
-  assertEquals("100", zkClient.getEntityConfigs("users/" + 
Sanitizer.sanitize("user@1") + "/clients", 
"clientA").getProperty("producer_byte_rate"))
-  assertEquals("10", zkClient.getEntityConfigs(ConfigType.IP, 
"8.8.8.8").getProperty("connection_creation_rate"))
+  def verifyBrokerConfigs(zkClient: KafkaZkClient, shouldRetry: Boolean): Unit 
= {
+maybeRetry(shouldRetry, 1) {
+  val defaultBrokerProps = zkClient.getEntityConfigs(ConfigType.BROKER, 
"")
+  assertEquals("8640", 
defaultBrokerProps.getProperty(ServerLogConfigs.LOG_RETENTION_TIME_MILLIS_CONFIG))
+
+  val broker0Props = zkClient.getEntityConfigs(ConfigType.BROKER, "0")
+  assertEquals("4320", 
broker0Props.getProperty(ServerLogConfigs.LOG_RETENTION_TIME_MILLIS_CONFIG))
+
+  val broker1Props = zkClient.getEntityConfigs(ConfigType.BROKER, "1")
+  assertEquals("4320", 
broker1Props.getProperty(ServerLogConfigs.LOG_RETENTION_TIME_MILLIS_CONFIG))
 }
   }
 
+  def verifyClientQuotas(zkClient: KafkaZkClient): Unit = {

Review Comment:
   why removing the retry? It seems that make the `testDualWriteQuotaAndScram` 
unstable



##
core/src/main/scala/kafka/zk/KafkaZkClient.scala:
##
@@ -467,13 +474,48 @@ class KafkaZkClient private[zk] (zooKeeperClient: 
ZooKeeperClient, isSecure: Boo
* @param rootEntityType entity type
* @param sanitizedEntityName entity name
* @throws KeeperException if there is an error while setting or creating 
the znode
+   * @throws ControllerMovedException if no controller is defined, or a KRaft 
controller is defined
*/
   def setOrCreateEntityConfigs(rootEntityType: String, sanitizedEntityName: 
String, config: Properties): Unit = {
+val controllerZkVersionOpt: Option[Int] = if 
(!enableEntityConfigNoController) {
+  val controllerRegistration = getControllerRegistration match {
+case Some(registration) => registration
+case None =>
+  // This case is mainly here to make tests less flaky (by virtue of 
retries).
+  // In practice, we always expect a /controller ZNode to exist
+  throw new ControllerMovedException(s"Cannot set entity configs when 
there is no controller.")
+  }
+
+  // If there is a KRaft controller defined, don't even attempt this 
write. The broker will soon get a UMR
+  // from the new KRaft controller that lets it know about the new 

Re: [PR] KAFKA-16539 Fix IncrementalAlterConfigs during ZK migration [kafka]

2024-05-05 Thread via GitHub


mumrah commented on code in PR #15744:
URL: https://github.com/apache/kafka/pull/15744#discussion_r1590396620


##
core/src/main/scala/kafka/zk/KafkaZkClient.scala:
##
@@ -467,13 +474,42 @@ class KafkaZkClient private[zk] (zooKeeperClient: 
ZooKeeperClient, isSecure: Boo
* @param rootEntityType entity type
* @param sanitizedEntityName entity name
* @throws KeeperException if there is an error while setting or creating 
the znode
+   * @throws ControllerMovedException if no controller is defined, or a KRaft 
controller is defined
*/
   def setOrCreateEntityConfigs(rootEntityType: String, sanitizedEntityName: 
String, config: Properties): Unit = {
+val controllerZkVersion = if (!enableEntityConfigNoController) {

Review Comment:
   We still need to support `kafka-config.sh --zookeeper` until 4.0. 
Previously, it was possible to set configs with the tools even if the cluster 
wasn't running. This flag lets us configure the whole KafkaZkClient in 
CommandConfig to allow this behavior. Otherwise, it's disabled.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16539 Fix IncrementalAlterConfigs during ZK migration [kafka]

2024-05-03 Thread via GitHub


chia7712 commented on code in PR #15744:
URL: https://github.com/apache/kafka/pull/15744#discussion_r1589865773


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -2992,6 +2992,11 @@ class KafkaApis(val requestChannel: RequestChannel,
 val preprocessingResponses = configManager.preprocess(original.data(),
   (rType, rName) => authHelper.authorize(request.context, ALTER_CONFIGS, 
rType, rName))
 val remaining = 
ConfigAdminManager.copyWithoutPreprocessed(original.data(), 
preprocessingResponses)
+val isKRaftController = metadataSupport match {

Review Comment:
   Could you please add comments for this check? It seems to me race condition 
always needs good docs.



##
core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala:
##
@@ -995,16 +1027,47 @@ class ZkMigrationIntegrationTest {
 dataOpt.map(ProducerIdBlockZNode.parseProducerIdBlockData).get
   }
 
-  def alterTopicConfig(admin: Admin): AlterConfigsResult = {
+  def alterBrokerConfigs(admin: Admin, shouldRetry: Boolean = false): Unit = {

Review Comment:
   ditto. please remove the unused default value.



##
core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala:
##
@@ -1037,24 +1104,35 @@ class ZkMigrationIntegrationTest {
 admin.alterUserScramCredentials(alterations)
   }
 
-  def verifyTopicConfigs(zkClient: KafkaZkClient): Unit = {
-TestUtils.retry(1) {
+  def verifyTopicConfigs(zkClient: KafkaZkClient, shouldRetry: Boolean = 
false): Unit = {

Review Comment:
   It seems the default value `= false` is unused, so could you please remove 
it?



##
core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java:
##
@@ -92,7 +92,7 @@ public List getAdditionalExtensions() {
 if (clusterConfig.numControllers() != 1) {
 throw new IllegalArgumentException("For ZK clusters, please 
specify exactly 1 controller.");
 }
-ClusterInstance clusterShim = new ZkClusterInstance(clusterConfig, 
clusterReference);
+ZkClusterInstance clusterShim = new ZkClusterInstance(clusterConfig, 
clusterReference);

Review Comment:
   Maybe we can revert this unrelated change?



##
core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala:
##
@@ -1037,24 +1104,35 @@ class ZkMigrationIntegrationTest {
 admin.alterUserScramCredentials(alterations)
   }
 
-  def verifyTopicConfigs(zkClient: KafkaZkClient): Unit = {
-TestUtils.retry(1) {
+  def verifyTopicConfigs(zkClient: KafkaZkClient, shouldRetry: Boolean = 
false): Unit = {
+maybeRetry(shouldRetry, 1) {
   val propsAfter = zkClient.getEntityConfigs(ConfigType.TOPIC, "test")
   assertEquals("204800", 
propsAfter.getProperty(TopicConfig.SEGMENT_BYTES_CONFIG))
   assertFalse(propsAfter.containsKey(TopicConfig.SEGMENT_MS_CONFIG))
 }
   }
 
-  def verifyClientQuotas(zkClient: KafkaZkClient): Unit = {
-TestUtils.retry(1) {
-  assertEquals("1000", zkClient.getEntityConfigs(ConfigType.USER, 
Sanitizer.sanitize("user@1")).getProperty("consumer_byte_rate"))
-  assertEquals("900", zkClient.getEntityConfigs(ConfigType.USER, 
"").getProperty("consumer_byte_rate"))
-  assertEquals("800", zkClient.getEntityConfigs("users/" + 
Sanitizer.sanitize("user@1") + "/clients", 
"clientA").getProperty("consumer_byte_rate"))
-  assertEquals("100", zkClient.getEntityConfigs("users/" + 
Sanitizer.sanitize("user@1") + "/clients", 
"clientA").getProperty("producer_byte_rate"))
-  assertEquals("10", zkClient.getEntityConfigs(ConfigType.IP, 
"8.8.8.8").getProperty("connection_creation_rate"))
+  def verifyBrokerConfigs(zkClient: KafkaZkClient, shouldRetry: Boolean = 
false): Unit = {

Review Comment:
   ditto. please remove the unused default value.



##
core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala:
##
@@ -995,16 +1027,47 @@ class ZkMigrationIntegrationTest {
 dataOpt.map(ProducerIdBlockZNode.parseProducerIdBlockData).get
   }
 
-  def alterTopicConfig(admin: Admin): AlterConfigsResult = {
+  def alterBrokerConfigs(admin: Admin, shouldRetry: Boolean = false): Unit = {
+val defaultBrokerResource = new ConfigResource(ConfigResource.Type.BROKER, 
"")
+val defaultBrokerConfigs = Seq(
+  new AlterConfigOp(new 
ConfigEntry(ServerLogConfigs.LOG_RETENTION_TIME_MILLIS_CONFIG, "8640"), 
AlterConfigOp.OpType.SET),
+).asJavaCollection
+val broker0Resource = new ConfigResource(ConfigResource.Type.BROKER, "0")
+val broker1Resource = new ConfigResource(ConfigResource.Type.BROKER, "1")
+val specificBrokerConfigs = Seq(
+  new AlterConfigOp(new 
ConfigEntry(ServerLogConfigs.LOG_RETENTION_TIME_MILLIS_CONFIG, "4320"), 
AlterConfigOp.OpType.SET),
+).asJavaCollection
+
+maybeRetry(shouldRetry, 6) {
+  val result = admin.incrementalAlterConfigs(Map(
+defaultBrokerResource -> 

Re: [PR] KAFKA-16539 Fix IncrementalAlterConfigs during ZK migration [kafka]

2024-05-03 Thread via GitHub


mumrah commented on PR #15744:
URL: https://github.com/apache/kafka/pull/15744#issuecomment-2093513532

   So many conflicts  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16539 Fix IncrementalAlterConfigs during ZK migration [kafka]

2024-05-01 Thread via GitHub


mumrah commented on code in PR #15744:
URL: https://github.com/apache/kafka/pull/15744#discussion_r1586349388


##
core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java:
##
@@ -106,6 +106,7 @@ public List getAdditionalExtensions() {
 },
 (AfterTestExecutionCallback) context -> clusterShim.stop(),
 new ClusterInstanceParameterResolver(clusterShim),
+new GenericParameterResolver<>(clusterShim, 
ZkClusterInstance.class),

Review Comment:
   Fair enough. I'll remove the new injection providers and just cast. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16539 Fix IncrementalAlterConfigs during ZK migration [kafka]

2024-04-29 Thread via GitHub


chia7712 commented on code in PR #15744:
URL: https://github.com/apache/kafka/pull/15744#discussion_r1584100563


##
core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java:
##
@@ -106,6 +106,7 @@ public List getAdditionalExtensions() {
 },
 (AfterTestExecutionCallback) context -> clusterShim.stop(),
 new ClusterInstanceParameterResolver(clusterShim),
+new GenericParameterResolver<>(clusterShim, 
ZkClusterInstance.class),

Review Comment:
   This change involve the conflicts. For another, it seems to me this could 
encourage developers to use specific type of `ClusterInstance`. I agree there 
are some test cases requiring the specific cluster type. They can cast the 
`ClusterInstance` to either `zk` and `Raft` so it seems to me enabling 
injection is overkill.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16539 Fix IncrementalAlterConfigs during ZK migration [kafka]

2024-04-29 Thread via GitHub


mumrah commented on code in PR #15744:
URL: https://github.com/apache/kafka/pull/15744#discussion_r1583263129


##
core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala:
##
@@ -950,16 +980,47 @@ class ZkMigrationIntegrationTest {
 dataOpt.map(ProducerIdBlockZNode.parseProducerIdBlockData).get
   }
 
-  def alterTopicConfig(admin: Admin): AlterConfigsResult = {
+  def alterBrokerConfigs(admin: Admin): Unit = {
+val defaultBrokerResource = new ConfigResource(ConfigResource.Type.BROKER, 
"")
+val defaultBrokerConfigs = Seq(
+  new AlterConfigOp(new 
ConfigEntry(KafkaConfig.LogRetentionTimeMillisProp, "8640"), 
AlterConfigOp.OpType.SET),
+).asJavaCollection
+val broker0Resource = new ConfigResource(ConfigResource.Type.BROKER, "0")
+val broker1Resource = new ConfigResource(ConfigResource.Type.BROKER, "1")
+val specificBrokerConfigs = Seq(
+  new AlterConfigOp(new 
ConfigEntry(KafkaConfig.LogRetentionTimeMillisProp, "4320"), 
AlterConfigOp.OpType.SET),
+).asJavaCollection
+
+TestUtils.retry(6) {
+  val result = admin.incrementalAlterConfigs(Map(
+defaultBrokerResource -> defaultBrokerConfigs,
+broker0Resource -> specificBrokerConfigs,
+broker1Resource -> specificBrokerConfigs
+  ).asJava)
+  try {
+result.all().get(10, TimeUnit.SECONDS)
+  } catch {
+case t: Throwable => fail("Alter Broker Configs had an error", t)
+  }
+}

Review Comment:
   The reason for the retries in both tests is to deal with a few race 
conditions without making the test code overly complex. 
   
   `testDualWrite` blocks until the `/controller` ZNode is updated to the KRaft 
controller. However, the ZK brokers won't learn about this new controller until 
the migration finishes and they receive a UMR. So that's the first race.
   
   The second race is that brokers receive UMR independently, so we would have 
to wait for all the brokers to be in the same state regarding the controller. 
We'd have to get down to the MetadataCache to see this.
   
   Since the MetadataResponse never exposes any information about KRaft or the 
migration, so we can't use that as a way to poll the brokers for a ready state. 
   
   So basically, retrying the alter config requests until they succeed (up to a 
limit) will smooth over these timing issues.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16539 Fix IncrementalAlterConfigs during ZK migration [kafka]

2024-04-26 Thread via GitHub


chia7712 commented on PR #15744:
URL: https://github.com/apache/kafka/pull/15744#issuecomment-2080249371

   @mumrah I'm sorry but could you please fix the conflicts again?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16539 Fix IncrementalAlterConfigs during ZK migration [kafka]

2024-04-25 Thread via GitHub


chia7712 commented on code in PR #15744:
URL: https://github.com/apache/kafka/pull/15744#discussion_r1580175708


##
core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala:
##
@@ -620,16 +620,19 @@ class ZkMigrationIntegrationTest {
   // Alter the metadata
   log.info("Updating metadata with AdminClient")
   admin = zkCluster.createAdminClient()
-  alterTopicConfig(admin).all().get(60, TimeUnit.SECONDS)
-  alterClientQuotas(admin).all().get(60, TimeUnit.SECONDS)
+  alterTopicConfig(admin)
+  alterClientQuotas(admin)
+  alterBrokerConfigs(admin)
 
   // Verify the changes made to KRaft are seen in ZK
   log.info("Verifying metadata changes with ZK")
   verifyTopicConfigs(zkClient)
   verifyClientQuotas(zkClient)
+  verifyBrokerConfigs(zkClient)
   val nextKRaftProducerId = 
sendAllocateProducerIds(zkCluster.asInstanceOf[ZkClusterInstance]).get(30, 
TimeUnit.SECONDS)
   assertNotEquals(nextProducerId, nextKRaftProducerId)
-
+} catch {
+  case t: Throwable => fail("Uncaught error in test", t)

Review Comment:
   >  I've seen many cases where its hard to see where a test failing due to a 
throw. 
   
   Could you please share the error stack to me?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16539 Fix IncrementalAlterConfigs during ZK migration [kafka]

2024-04-25 Thread via GitHub


chia7712 commented on code in PR #15744:
URL: https://github.com/apache/kafka/pull/15744#discussion_r1580174626


##
core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala:
##
@@ -852,6 +855,33 @@ class ZkMigrationIntegrationTest {
 }
   }
 
+  @ClusterTest(clusterType = Type.ZK, brokers = 3, metadataVersion = 
MetadataVersion.IBP_3_4_IV0, serverProperties = Array(
+new ClusterConfigProperty(key = "inter.broker.listener.name", value = 
"EXTERNAL"),
+new ClusterConfigProperty(key = "listeners", value = 
"PLAINTEXT://localhost:0,EXTERNAL://localhost:0"),
+new ClusterConfigProperty(key = "advertised.listeners", value = 
"PLAINTEXT://localhost:0,EXTERNAL://localhost:0"),
+new ClusterConfigProperty(key = "listener.security.protocol.map", value = 
"EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT"),
+  ))
+  def testIncrementalAlterConfigsPreMigration(zkCluster: ClusterInstance): 
Unit = {
+// Enable migration configs and restart brokers without KRaft quorum ready
+
zkCluster.config().serverProperties().put(KafkaConfig.MigrationEnabledProp, 
"true")
+zkCluster.config().serverProperties().put(RaftConfig.QUORUM_VOTERS_CONFIG, 
"1@localhost:")

Review Comment:
   Sorry that I just notice this PR after merging the refactor-related PR :_



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16539 Fix IncrementalAlterConfigs during ZK migration [kafka]

2024-04-25 Thread via GitHub


mumrah commented on code in PR #15744:
URL: https://github.com/apache/kafka/pull/15744#discussion_r1580150231


##
core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala:
##
@@ -852,6 +855,33 @@ class ZkMigrationIntegrationTest {
 }
   }
 
+  @ClusterTest(clusterType = Type.ZK, brokers = 3, metadataVersion = 
MetadataVersion.IBP_3_4_IV0, serverProperties = Array(
+new ClusterConfigProperty(key = "inter.broker.listener.name", value = 
"EXTERNAL"),
+new ClusterConfigProperty(key = "listeners", value = 
"PLAINTEXT://localhost:0,EXTERNAL://localhost:0"),
+new ClusterConfigProperty(key = "advertised.listeners", value = 
"PLAINTEXT://localhost:0,EXTERNAL://localhost:0"),
+new ClusterConfigProperty(key = "listener.security.protocol.map", value = 
"EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT"),
+  ))
+  def testIncrementalAlterConfigsPreMigration(zkCluster: ClusterInstance): 
Unit = {
+// Enable migration configs and restart brokers without KRaft quorum ready
+
zkCluster.config().serverProperties().put(KafkaConfig.MigrationEnabledProp, 
"true")
+zkCluster.config().serverProperties().put(RaftConfig.QUORUM_VOTERS_CONFIG, 
"1@localhost:")

Review Comment:
   The config changes are killing me in this PR  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16539 Fix IncrementalAlterConfigs during ZK migration [kafka]

2024-04-25 Thread via GitHub


mumrah commented on code in PR #15744:
URL: https://github.com/apache/kafka/pull/15744#discussion_r1580149700


##
core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala:
##
@@ -620,16 +620,19 @@ class ZkMigrationIntegrationTest {
   // Alter the metadata
   log.info("Updating metadata with AdminClient")
   admin = zkCluster.createAdminClient()
-  alterTopicConfig(admin).all().get(60, TimeUnit.SECONDS)
-  alterClientQuotas(admin).all().get(60, TimeUnit.SECONDS)
+  alterTopicConfig(admin)
+  alterClientQuotas(admin)
+  alterBrokerConfigs(admin)
 
   // Verify the changes made to KRaft are seen in ZK
   log.info("Verifying metadata changes with ZK")
   verifyTopicConfigs(zkClient)
   verifyClientQuotas(zkClient)
+  verifyBrokerConfigs(zkClient)
   val nextKRaftProducerId = 
sendAllocateProducerIds(zkCluster.asInstanceOf[ZkClusterInstance]).get(30, 
TimeUnit.SECONDS)
   assertNotEquals(nextProducerId, nextKRaftProducerId)
-
+} catch {
+  case t: Throwable => fail("Uncaught error in test", t)

Review Comment:
   Just my preference I guess. I've seen many cases where its hard to see where 
a test failing due to a throw. Maybe this is better in recent versions of JUnit?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16539 Fix IncrementalAlterConfigs during ZK migration [kafka]

2024-04-25 Thread via GitHub


chia7712 commented on code in PR #15744:
URL: https://github.com/apache/kafka/pull/15744#discussion_r1580130359


##
core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala:
##
@@ -950,16 +980,47 @@ class ZkMigrationIntegrationTest {
 dataOpt.map(ProducerIdBlockZNode.parseProducerIdBlockData).get
   }
 
-  def alterTopicConfig(admin: Admin): AlterConfigsResult = {
+  def alterBrokerConfigs(admin: Admin): Unit = {
+val defaultBrokerResource = new ConfigResource(ConfigResource.Type.BROKER, 
"")
+val defaultBrokerConfigs = Seq(
+  new AlterConfigOp(new 
ConfigEntry(KafkaConfig.LogRetentionTimeMillisProp, "8640"), 
AlterConfigOp.OpType.SET),
+).asJavaCollection
+val broker0Resource = new ConfigResource(ConfigResource.Type.BROKER, "0")
+val broker1Resource = new ConfigResource(ConfigResource.Type.BROKER, "1")
+val specificBrokerConfigs = Seq(
+  new AlterConfigOp(new 
ConfigEntry(KafkaConfig.LogRetentionTimeMillisProp, "4320"), 
AlterConfigOp.OpType.SET),
+).asJavaCollection
+
+TestUtils.retry(6) {
+  val result = admin.incrementalAlterConfigs(Map(
+defaultBrokerResource -> defaultBrokerConfigs,
+broker0Resource -> specificBrokerConfigs,
+broker1Resource -> specificBrokerConfigs
+  ).asJava)
+  try {
+result.all().get(10, TimeUnit.SECONDS)
+  } catch {
+case t: Throwable => fail("Alter Broker Configs had an error", t)
+  }
+}

Review Comment:
   I have the same question here.
   1. In the test case `testIncrementalAlterConfigsPreMigration`, it should 
pass at once since KRaft quorum is not ready.
   2. In the test `testDualWrite`,  it should pass at once as we have wait for 
migration to begin.



##
core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala:
##
@@ -974,7 +1035,11 @@ class ZkMigrationIntegrationTest {
 quotas.add(new ClientQuotaAlteration(
   new ClientQuotaEntity(Map("ip" -> "8.8.8.8").asJava),
   List(new ClientQuotaAlteration.Op("connection_creation_rate", 
10.0)).asJava))
-admin.alterClientQuotas(quotas)
+try {
+  admin.alterClientQuotas(quotas).all().get(10, TimeUnit.SECONDS)
+} catch {
+  case t: Throwable => fail("Alter Client Quotas had an error", t)

Review Comment:
   ditto. not sure why we need to catch it and then call `fail`



##
core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala:
##
@@ -620,16 +620,19 @@ class ZkMigrationIntegrationTest {
   // Alter the metadata
   log.info("Updating metadata with AdminClient")
   admin = zkCluster.createAdminClient()
-  alterTopicConfig(admin).all().get(60, TimeUnit.SECONDS)
-  alterClientQuotas(admin).all().get(60, TimeUnit.SECONDS)
+  alterTopicConfig(admin)
+  alterClientQuotas(admin)
+  alterBrokerConfigs(admin)
 
   // Verify the changes made to KRaft are seen in ZK
   log.info("Verifying metadata changes with ZK")
   verifyTopicConfigs(zkClient)
   verifyClientQuotas(zkClient)
+  verifyBrokerConfigs(zkClient)
   val nextKRaftProducerId = 
sendAllocateProducerIds(zkCluster.asInstanceOf[ZkClusterInstance]).get(30, 
TimeUnit.SECONDS)
   assertNotEquals(nextProducerId, nextKRaftProducerId)
-
+} catch {
+  case t: Throwable => fail("Uncaught error in test", t)

Review Comment:
   why catching the error in this test case? The test case can get failed even 
though we don't catch it. Or you plan to make it be retryable by 
`TestUtils.retry`?



##
core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala:
##
@@ -852,6 +855,33 @@ class ZkMigrationIntegrationTest {
 }
   }
 
+  @ClusterTest(clusterType = Type.ZK, brokers = 3, metadataVersion = 
MetadataVersion.IBP_3_4_IV0, serverProperties = Array(
+new ClusterConfigProperty(key = "inter.broker.listener.name", value = 
"EXTERNAL"),
+new ClusterConfigProperty(key = "listeners", value = 
"PLAINTEXT://localhost:0,EXTERNAL://localhost:0"),
+new ClusterConfigProperty(key = "advertised.listeners", value = 
"PLAINTEXT://localhost:0,EXTERNAL://localhost:0"),
+new ClusterConfigProperty(key = "listener.security.protocol.map", value = 
"EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT"),
+  ))
+  def testIncrementalAlterConfigsPreMigration(zkCluster: ClusterInstance): 
Unit = {
+// Enable migration configs and restart brokers without KRaft quorum ready
+
zkCluster.config().serverProperties().put(KafkaConfig.MigrationEnabledProp, 
"true")
+zkCluster.config().serverProperties().put(RaftConfig.QUORUM_VOTERS_CONFIG, 
"1@localhost:")

Review Comment:
   `RaftConfig` is renamed to `QuorumConfig`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about 

Re: [PR] KAFKA-16539 Fix IncrementalAlterConfigs during ZK migration [kafka]

2024-04-24 Thread via GitHub


showuon commented on code in PR #15744:
URL: https://github.com/apache/kafka/pull/15744#discussion_r1577340553


##
core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala:
##
@@ -194,6 +199,8 @@ class AdminZkClientTest extends QuorumTestHarness with 
Logging with RackAwareTes
*/
   @Test
   def testTopicConfigChange(): Unit = {
+TestUtils.deleteControllerFromZk(zkClient)

Review Comment:
   I can understand we need to createController in ZK first to set configs. But 
I don't understand why we should delete the controller from ZK here? Might need 
some comments for it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16539 Fix IncrementalAlterConfigs during ZK migration [kafka]

2024-04-20 Thread via GitHub


soarez commented on code in PR #15744:
URL: https://github.com/apache/kafka/pull/15744#discussion_r1573203822


##
core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala:
##
@@ -950,16 +980,47 @@ class ZkMigrationIntegrationTest {
 dataOpt.map(ProducerIdBlockZNode.parseProducerIdBlockData).get
   }
 
-  def alterTopicConfig(admin: Admin): AlterConfigsResult = {
+  def alterBrokerConfigs(admin: Admin): Unit = {
+val defaultBrokerResource = new ConfigResource(ConfigResource.Type.BROKER, 
"")
+val defaultBrokerConfigs = Seq(
+  new AlterConfigOp(new 
ConfigEntry(KafkaConfig.LogRetentionTimeMillisProp, "8640"), 
AlterConfigOp.OpType.SET),
+).asJavaCollection
+val broker0Resource = new ConfigResource(ConfigResource.Type.BROKER, "0")
+val broker1Resource = new ConfigResource(ConfigResource.Type.BROKER, "1")
+val specificBrokerConfigs = Seq(
+  new AlterConfigOp(new 
ConfigEntry(KafkaConfig.LogRetentionTimeMillisProp, "4320"), 
AlterConfigOp.OpType.SET),
+).asJavaCollection
+
+TestUtils.retry(6) {
+  val result = admin.incrementalAlterConfigs(Map(
+defaultBrokerResource -> defaultBrokerConfigs,
+broker0Resource -> specificBrokerConfigs,
+broker1Resource -> specificBrokerConfigs
+  ).asJava)
+  try {
+result.all().get(10, TimeUnit.SECONDS)
+  } catch {
+case t: Throwable => fail("Alter Broker Configs had an error", t)
+  }
+}

Review Comment:
   That makes sense. I'm just concerned this retry could hide instability in 
the operation we're trying to test. I'm thinking we could bypass the client API 
and retry just on the precondition of the controller being KRaft by e.g. 
reading the /controller znode, and then expect `incrementalAlterConfigs` to 
succeed at once. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16539 Fix IncrementalAlterConfigs during ZK migration [kafka]

2024-04-19 Thread via GitHub


mumrah commented on PR #15744:
URL: https://github.com/apache/kafka/pull/15744#issuecomment-2067204988

   Updated to include a CheckOp on the `/controller` ZNode. We don't both using 
the controller epoch since it is not straightforward to consistently read the 
controller and controller epoch from a non-controller broker. We can achieve 
the same fencing by checking that the `/controller` ZNode doesn't change during 
the method.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16539 Fix IncrementalAlterConfigs during ZK migration [kafka]

2024-04-19 Thread via GitHub


mumrah commented on code in PR #15744:
URL: https://github.com/apache/kafka/pull/15744#discussion_r1572801229


##
core/src/main/scala/kafka/zk/KafkaZkClient.scala:
##
@@ -467,13 +470,33 @@ class KafkaZkClient private[zk] (zooKeeperClient: 
ZooKeeperClient, isSecure: Boo
* @param rootEntityType entity type
* @param sanitizedEntityName entity name
* @throws KeeperException if there is an error while setting or creating 
the znode
+   * @throws ControllerMovedException if no controller is defined, or a KRaft 
controller is defined
*/
   def setOrCreateEntityConfigs(rootEntityType: String, sanitizedEntityName: 
String, config: Properties): Unit = {
+val controllerRegistration = getControllerRegistration match {
+  case Some(registration) => registration
+  case None =>
+// This case is mainly here to make tests less flaky. In practice, 
there will always be a /controller ZNode
+throw new ControllerMovedException(s"Cannot set entity configs when 
there is no controller.")
+}
+
+// If there is a KRaft controller defined, don't even attempt this write. 
The broker will soon get a UMR
+// from the new KRaft controller that lets it know about the new 
controller. It will then forward
+// IncrementalAlterConfig requests instead of processing directly.
+if (controllerRegistration.kraftEpoch.exists(epoch => epoch > 0)) {
+  throw new ControllerMovedException(s"Cannot set entity configs directly 
when there is a KRaft controller.")
+}
 
 def set(configData: Array[Byte]): SetDataResponse = {
   val setDataRequest = 
SetDataRequest(ConfigEntityZNode.path(rootEntityType, sanitizedEntityName),
 configData, ZkVersion.MatchAnyVersion)
-  retryRequestUntilConnected(setDataRequest)
+  if (controllerRegistration.zkVersion > 0) {
+// Pass the zkVersion previously captured to ensure the controller 
hasn't changed to KRaft while
+// this method was processing.
+retryRequestUntilConnected(setDataRequest, 
controllerRegistration.zkVersion)
+  } else {
+retryRequestUntilConnected(setDataRequest)

Review Comment:
   On second thought, this isn't quite right. I need to rework this a bit



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16539 Fix IncrementalAlterConfigs during ZK migration [kafka]

2024-04-19 Thread via GitHub


mumrah commented on code in PR #15744:
URL: https://github.com/apache/kafka/pull/15744#discussion_r1572780325


##
core/src/main/scala/kafka/zk/KafkaZkClient.scala:
##
@@ -467,13 +470,33 @@ class KafkaZkClient private[zk] (zooKeeperClient: 
ZooKeeperClient, isSecure: Boo
* @param rootEntityType entity type
* @param sanitizedEntityName entity name
* @throws KeeperException if there is an error while setting or creating 
the znode
+   * @throws ControllerMovedException if no controller is defined, or a KRaft 
controller is defined
*/
   def setOrCreateEntityConfigs(rootEntityType: String, sanitizedEntityName: 
String, config: Properties): Unit = {
+val controllerRegistration = getControllerRegistration match {
+  case Some(registration) => registration
+  case None =>
+// This case is mainly here to make tests less flaky. In practice, 
there will always be a /controller ZNode
+throw new ControllerMovedException(s"Cannot set entity configs when 
there is no controller.")
+}
+
+// If there is a KRaft controller defined, don't even attempt this write. 
The broker will soon get a UMR
+// from the new KRaft controller that lets it know about the new 
controller. It will then forward
+// IncrementalAlterConfig requests instead of processing directly.
+if (controllerRegistration.kraftEpoch.exists(epoch => epoch > 0)) {
+  throw new ControllerMovedException(s"Cannot set entity configs directly 
when there is a KRaft controller.")
+}
 
 def set(configData: Array[Byte]): SetDataResponse = {
   val setDataRequest = 
SetDataRequest(ConfigEntityZNode.path(rootEntityType, sanitizedEntityName),
 configData, ZkVersion.MatchAnyVersion)
-  retryRequestUntilConnected(setDataRequest)
+  if (controllerRegistration.zkVersion > 0) {
+// Pass the zkVersion previously captured to ensure the controller 
hasn't changed to KRaft while
+// this method was processing.
+retryRequestUntilConnected(setDataRequest, 
controllerRegistration.zkVersion)
+  } else {
+retryRequestUntilConnected(setDataRequest)

Review Comment:
   This is essentially the same as the match None case above. In our 
integration tests, we can (and apparently do) set configs in ZK before the 
controller gets elected for the first time. This was kept to avoid breaking a 
bunch of tests.
   
   Historically, setting configs will never fail (last writer wins) so neither 
the client nor broker implement any retries.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16539 Fix IncrementalAlterConfigs during ZK migration [kafka]

2024-04-19 Thread via GitHub


mumrah commented on code in PR #15744:
URL: https://github.com/apache/kafka/pull/15744#discussion_r1572777334


##
core/src/main/scala/kafka/zk/KafkaZkClient.scala:
##
@@ -467,13 +470,33 @@ class KafkaZkClient private[zk] (zooKeeperClient: 
ZooKeeperClient, isSecure: Boo
* @param rootEntityType entity type
* @param sanitizedEntityName entity name
* @throws KeeperException if there is an error while setting or creating 
the znode
+   * @throws ControllerMovedException if no controller is defined, or a KRaft 
controller is defined
*/
   def setOrCreateEntityConfigs(rootEntityType: String, sanitizedEntityName: 
String, config: Properties): Unit = {
+val controllerRegistration = getControllerRegistration match {
+  case Some(registration) => registration
+  case None =>
+// This case is mainly here to make tests less flaky. In practice, 
there will always be a /controller ZNode
+throw new ControllerMovedException(s"Cannot set entity configs when 
there is no controller.")
+}
+
+// If there is a KRaft controller defined, don't even attempt this write. 
The broker will soon get a UMR
+// from the new KRaft controller that lets it know about the new 
controller. It will then forward
+// IncrementalAlterConfig requests instead of processing directly.
+if (controllerRegistration.kraftEpoch.exists(epoch => epoch > 0)) {
+  throw new ControllerMovedException(s"Cannot set entity configs directly 
when there is a KRaft controller.")
+}
 
 def set(configData: Array[Byte]): SetDataResponse = {
   val setDataRequest = 
SetDataRequest(ConfigEntityZNode.path(rootEntityType, sanitizedEntityName),
 configData, ZkVersion.MatchAnyVersion)
-  retryRequestUntilConnected(setDataRequest)
+  if (controllerRegistration.zkVersion > 0) {

Review Comment:
   Yes. This makes the update here more like the updates made from the ZK 
controller. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16539 Fix IncrementalAlterConfigs during ZK migration [kafka]

2024-04-19 Thread via GitHub


mumrah commented on code in PR #15744:
URL: https://github.com/apache/kafka/pull/15744#discussion_r1572776248


##
core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala:
##
@@ -950,16 +980,47 @@ class ZkMigrationIntegrationTest {
 dataOpt.map(ProducerIdBlockZNode.parseProducerIdBlockData).get
   }
 
-  def alterTopicConfig(admin: Admin): AlterConfigsResult = {
+  def alterBrokerConfigs(admin: Admin): Unit = {
+val defaultBrokerResource = new ConfigResource(ConfigResource.Type.BROKER, 
"")
+val defaultBrokerConfigs = Seq(
+  new AlterConfigOp(new 
ConfigEntry(KafkaConfig.LogRetentionTimeMillisProp, "8640"), 
AlterConfigOp.OpType.SET),
+).asJavaCollection
+val broker0Resource = new ConfigResource(ConfigResource.Type.BROKER, "0")
+val broker1Resource = new ConfigResource(ConfigResource.Type.BROKER, "1")
+val specificBrokerConfigs = Seq(
+  new AlterConfigOp(new 
ConfigEntry(KafkaConfig.LogRetentionTimeMillisProp, "4320"), 
AlterConfigOp.OpType.SET),
+).asJavaCollection
+
+TestUtils.retry(6) {
+  val result = admin.incrementalAlterConfigs(Map(
+defaultBrokerResource -> defaultBrokerConfigs,
+broker0Resource -> specificBrokerConfigs,
+broker1Resource -> specificBrokerConfigs
+  ).asJava)
+  try {
+result.all().get(10, TimeUnit.SECONDS)
+  } catch {
+case t: Throwable => fail("Alter Broker Configs had an error", t)
+  }
+}

Review Comment:
   > Could we verify that a KRaft controller is ready before testing this 
operation to make the outcome more predictable?
   
   I tried this at first, but we don't actually expose this to the client 
(whether the controller is ZK or KRaft). In fact, we lie to the client and tell 
it that a random broker is the controller. I could have "pierced the veil" to 
access the underlying controller classes, but a retry seemed easier. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16539 Fix IncrementalAlterConfigs during ZK migration [kafka]

2024-04-19 Thread via GitHub


cmccabe commented on code in PR #15744:
URL: https://github.com/apache/kafka/pull/15744#discussion_r1572722514


##
core/src/main/scala/kafka/zk/KafkaZkClient.scala:
##
@@ -467,13 +470,33 @@ class KafkaZkClient private[zk] (zooKeeperClient: 
ZooKeeperClient, isSecure: Boo
* @param rootEntityType entity type
* @param sanitizedEntityName entity name
* @throws KeeperException if there is an error while setting or creating 
the znode
+   * @throws ControllerMovedException if no controller is defined, or a KRaft 
controller is defined
*/
   def setOrCreateEntityConfigs(rootEntityType: String, sanitizedEntityName: 
String, config: Properties): Unit = {
+val controllerRegistration = getControllerRegistration match {
+  case Some(registration) => registration
+  case None =>
+// This case is mainly here to make tests less flaky. In practice, 
there will always be a /controller ZNode
+throw new ControllerMovedException(s"Cannot set entity configs when 
there is no controller.")
+}
+
+// If there is a KRaft controller defined, don't even attempt this write. 
The broker will soon get a UMR
+// from the new KRaft controller that lets it know about the new 
controller. It will then forward
+// IncrementalAlterConfig requests instead of processing directly.
+if (controllerRegistration.kraftEpoch.exists(epoch => epoch > 0)) {
+  throw new ControllerMovedException(s"Cannot set entity configs directly 
when there is a KRaft controller.")
+}
 
 def set(configData: Array[Byte]): SetDataResponse = {
   val setDataRequest = 
SetDataRequest(ConfigEntityZNode.path(rootEntityType, sanitizedEntityName),
 configData, ZkVersion.MatchAnyVersion)
-  retryRequestUntilConnected(setDataRequest)
+  if (controllerRegistration.zkVersion > 0) {
+// Pass the zkVersion previously captured to ensure the controller 
hasn't changed to KRaft while
+// this method was processing.
+retryRequestUntilConnected(setDataRequest, 
controllerRegistration.zkVersion)
+  } else {
+retryRequestUntilConnected(setDataRequest)

Review Comment:
   can you explain how we'd get to this branch of the "if" statement? Surely 
the controller znode always has a version?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16539 Fix IncrementalAlterConfigs during ZK migration [kafka]

2024-04-19 Thread via GitHub


cmccabe commented on code in PR #15744:
URL: https://github.com/apache/kafka/pull/15744#discussion_r1572717868


##
core/src/main/scala/kafka/zk/KafkaZkClient.scala:
##
@@ -467,13 +470,33 @@ class KafkaZkClient private[zk] (zooKeeperClient: 
ZooKeeperClient, isSecure: Boo
* @param rootEntityType entity type
* @param sanitizedEntityName entity name
* @throws KeeperException if there is an error while setting or creating 
the znode
+   * @throws ControllerMovedException if no controller is defined, or a KRaft 
controller is defined
*/
   def setOrCreateEntityConfigs(rootEntityType: String, sanitizedEntityName: 
String, config: Properties): Unit = {
+val controllerRegistration = getControllerRegistration match {
+  case Some(registration) => registration
+  case None =>
+// This case is mainly here to make tests less flaky. In practice, 
there will always be a /controller ZNode
+throw new ControllerMovedException(s"Cannot set entity configs when 
there is no controller.")
+}
+
+// If there is a KRaft controller defined, don't even attempt this write. 
The broker will soon get a UMR
+// from the new KRaft controller that lets it know about the new 
controller. It will then forward
+// IncrementalAlterConfig requests instead of processing directly.
+if (controllerRegistration.kraftEpoch.exists(epoch => epoch > 0)) {
+  throw new ControllerMovedException(s"Cannot set entity configs directly 
when there is a KRaft controller.")
+}
 
 def set(configData: Array[Byte]): SetDataResponse = {
   val setDataRequest = 
SetDataRequest(ConfigEntityZNode.path(rootEntityType, sanitizedEntityName),
 configData, ZkVersion.MatchAnyVersion)
-  retryRequestUntilConnected(setDataRequest)
+  if (controllerRegistration.zkVersion > 0) {

Review Comment:
   this also fixes the lack of fencing, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16539 Fix IncrementalAlterConfigs during ZK migration [kafka]

2024-04-19 Thread via GitHub


cmccabe commented on code in PR #15744:
URL: https://github.com/apache/kafka/pull/15744#discussion_r1572716567


##
core/src/main/scala/kafka/zk/KafkaZkClient.scala:
##
@@ -467,13 +470,33 @@ class KafkaZkClient private[zk] (zooKeeperClient: 
ZooKeeperClient, isSecure: Boo
* @param rootEntityType entity type
* @param sanitizedEntityName entity name
* @throws KeeperException if there is an error while setting or creating 
the znode
+   * @throws ControllerMovedException if no controller is defined, or a KRaft 
controller is defined
*/
   def setOrCreateEntityConfigs(rootEntityType: String, sanitizedEntityName: 
String, config: Properties): Unit = {
+val controllerRegistration = getControllerRegistration match {
+  case Some(registration) => registration
+  case None =>
+// This case is mainly here to make tests less flaky. In practice, 
there will always be a /controller ZNode

Review Comment:
   I was a bit surprised that `ControlledMovedException` was not retriable. 
Should we use a retriable exception here, or does it not matter?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16539 Fix IncrementalAlterConfigs during ZK migration [kafka]

2024-04-19 Thread via GitHub


cmccabe commented on code in PR #15744:
URL: https://github.com/apache/kafka/pull/15744#discussion_r1572716567


##
core/src/main/scala/kafka/zk/KafkaZkClient.scala:
##
@@ -467,13 +470,33 @@ class KafkaZkClient private[zk] (zooKeeperClient: 
ZooKeeperClient, isSecure: Boo
* @param rootEntityType entity type
* @param sanitizedEntityName entity name
* @throws KeeperException if there is an error while setting or creating 
the znode
+   * @throws ControllerMovedException if no controller is defined, or a KRaft 
controller is defined
*/
   def setOrCreateEntityConfigs(rootEntityType: String, sanitizedEntityName: 
String, config: Properties): Unit = {
+val controllerRegistration = getControllerRegistration match {
+  case Some(registration) => registration
+  case None =>
+// This case is mainly here to make tests less flaky. In practice, 
there will always be a /controller ZNode

Review Comment:
   I was a bit surprised that `ControlledMovedException` was not retriable. 
Should we use a retriable exception here, or does it not matter?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16539 Fix IncrementalAlterConfigs during ZK migration [kafka]

2024-04-19 Thread via GitHub


soarez commented on code in PR #15744:
URL: https://github.com/apache/kafka/pull/15744#discussion_r1572295590


##
core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala:
##
@@ -950,16 +980,47 @@ class ZkMigrationIntegrationTest {
 dataOpt.map(ProducerIdBlockZNode.parseProducerIdBlockData).get
   }
 
-  def alterTopicConfig(admin: Admin): AlterConfigsResult = {
+  def alterBrokerConfigs(admin: Admin): Unit = {
+val defaultBrokerResource = new ConfigResource(ConfigResource.Type.BROKER, 
"")
+val defaultBrokerConfigs = Seq(
+  new AlterConfigOp(new 
ConfigEntry(KafkaConfig.LogRetentionTimeMillisProp, "8640"), 
AlterConfigOp.OpType.SET),
+).asJavaCollection
+val broker0Resource = new ConfigResource(ConfigResource.Type.BROKER, "0")
+val broker1Resource = new ConfigResource(ConfigResource.Type.BROKER, "1")
+val specificBrokerConfigs = Seq(
+  new AlterConfigOp(new 
ConfigEntry(KafkaConfig.LogRetentionTimeMillisProp, "4320"), 
AlterConfigOp.OpType.SET),
+).asJavaCollection
+
+TestUtils.retry(6) {
+  val result = admin.incrementalAlterConfigs(Map(
+defaultBrokerResource -> defaultBrokerConfigs,
+broker0Resource -> specificBrokerConfigs,
+broker1Resource -> specificBrokerConfigs
+  ).asJava)
+  try {
+result.all().get(10, TimeUnit.SECONDS)
+  } catch {
+case t: Throwable => fail("Alter Broker Configs had an error", t)
+  }
+}

Review Comment:
   Do we really need the retry logic here? Could we verify that a KRaft 
controller is ready before testing this operation to make the outcome more 
predictable? Or is it expected that we may have to repeat the operation a few 
times regardless? And if that's the case should we update docs too to let 
operators know they might have to try a few times?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16539 Fix IncrementalAlterConfigs during ZK migration [kafka]

2024-04-19 Thread via GitHub


showuon commented on code in PR #15744:
URL: https://github.com/apache/kafka/pull/15744#discussion_r1572140166


##
core/src/main/scala/kafka/zk/KafkaZkClient.scala:
##
@@ -467,13 +470,33 @@ class KafkaZkClient private[zk] (zooKeeperClient: 
ZooKeeperClient, isSecure: Boo
* @param rootEntityType entity type
* @param sanitizedEntityName entity name
* @throws KeeperException if there is an error while setting or creating 
the znode
+   * @throws ControllerMovedException if no controller is defined, or a KRaft 
controller is defined
*/
   def setOrCreateEntityConfigs(rootEntityType: String, sanitizedEntityName: 
String, config: Properties): Unit = {
+val controllerRegistration = getControllerRegistration match {

Review Comment:
   Could we add some tests for these changes?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16539 Fix IncrementalAlterConfigs during ZK migration [kafka]

2024-04-17 Thread via GitHub


mumrah commented on code in PR #15744:
URL: https://github.com/apache/kafka/pull/15744#discussion_r1569214270


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -3003,9 +3008,10 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 }
 
+// Forwarding has not happened yet, so handle both ZK and KRaft cases here
 if (remaining.resources().isEmpty) {
   sendResponse(Some(new IncrementalAlterConfigsResponseData()))
-} else if ((!request.isForwarded) && metadataSupport.canForward()) {
+} else if ((!request.isForwarded) && metadataSupport.canForward() && 
isKRaftController) {

Review Comment:
   It's kind of inconsequential that the ZK controller was handling a forwarded 
IncrementalAlterConfigs. All of the IAC handling is done in KafkaApis and never 
even checks if it's the active controller. 
   
   Basically when handling the broker-specific configs, the broker handling the 
forwarded request (i.e., the ZK controller) would see that the broker ID didn't 
match and fail. 
   
   TBH, a better fix would be to always forward to the ZK controller, and 
update the ZK brokers to react to the ZNode change event to process BROKER and 
BROKER_LOGGER changes. But that's a much bigger change :-/ 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16539 Fix IncrementalAlterConfigs during ZK migration [kafka]

2024-04-17 Thread via GitHub


mumrah commented on code in PR #15744:
URL: https://github.com/apache/kafka/pull/15744#discussion_r1569208039


##
core/src/main/scala/kafka/zk/KafkaZkClient.scala:
##
@@ -467,13 +470,33 @@ class KafkaZkClient private[zk] (zooKeeperClient: 
ZooKeeperClient, isSecure: Boo
* @param rootEntityType entity type
* @param sanitizedEntityName entity name
* @throws KeeperException if there is an error while setting or creating 
the znode
+   * @throws ControllerMovedException if no controller is defined, or a KRaft 
controller is defined
*/
   def setOrCreateEntityConfigs(rootEntityType: String, sanitizedEntityName: 
String, config: Properties): Unit = {
+val controllerRegistration = getControllerRegistration match {
+  case Some(registration) => registration
+  case None =>
+// This case is mainly here to make tests less flaky. In practice, 
there will always be a /controller ZNode
+throw new ControllerMovedException(s"Cannot set entity configs when 
there is no controller.")
+}
+
+// If there is a KRaft controller defined, don't even attempt this write. 
The broker will soon get a UMR
+// from the new KRaft controller that lets it know about the new 
controller. It will then forward
+// IncrementalAlterConfig requests instead of processing directly.
+if (controllerRegistration.kraftEpoch.exists(epoch => epoch > 0)) {
+  throw new ControllerMovedException(s"Cannot set entity configs directly 
when there is a KRaft controller.")
+}

Review Comment:
   I don't think this method is in the dual-write path. All of the ZK writes 
for configs in dual-write mode should happen in ZkConfigMigrationClient



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16539 Fix IncrementalAlterConfigs during ZK migration [kafka]

2024-04-17 Thread via GitHub


akhileshchg commented on code in PR #15744:
URL: https://github.com/apache/kafka/pull/15744#discussion_r1569190991


##
core/src/main/scala/kafka/zk/KafkaZkClient.scala:
##
@@ -467,13 +470,33 @@ class KafkaZkClient private[zk] (zooKeeperClient: 
ZooKeeperClient, isSecure: Boo
* @param rootEntityType entity type
* @param sanitizedEntityName entity name
* @throws KeeperException if there is an error while setting or creating 
the znode
+   * @throws ControllerMovedException if no controller is defined, or a KRaft 
controller is defined
*/
   def setOrCreateEntityConfigs(rootEntityType: String, sanitizedEntityName: 
String, config: Properties): Unit = {
+val controllerRegistration = getControllerRegistration match {
+  case Some(registration) => registration
+  case None =>
+// This case is mainly here to make tests less flaky. In practice, 
there will always be a /controller ZNode
+throw new ControllerMovedException(s"Cannot set entity configs when 
there is no controller.")
+}
+
+// If there is a KRaft controller defined, don't even attempt this write. 
The broker will soon get a UMR
+// from the new KRaft controller that lets it know about the new 
controller. It will then forward
+// IncrementalAlterConfig requests instead of processing directly.
+if (controllerRegistration.kraftEpoch.exists(epoch => epoch > 0)) {
+  throw new ControllerMovedException(s"Cannot set entity configs directly 
when there is a KRaft controller.")
+}

Review Comment:
   Wouldn't this fail when KRaft controller tries to perform dual write to both 
KRaft log and Zk ?



##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -3003,9 +3008,10 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 }
 
+// Forwarding has not happened yet, so handle both ZK and KRaft cases here
 if (remaining.resources().isEmpty) {
   sendResponse(Some(new IncrementalAlterConfigsResponseData()))
-} else if ((!request.isForwarded) && metadataSupport.canForward()) {
+} else if ((!request.isForwarded) && metadataSupport.canForward() && 
isKRaftController) {

Review Comment:
   I have one question. Why can't the Zk controller take care of the forwarded 
alterConfig requests for other BROKER config entities?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16539 Fix IncrementalAlterConfigs during ZK migration [kafka]

2024-04-17 Thread via GitHub


mumrah commented on code in PR #15744:
URL: https://github.com/apache/kafka/pull/15744#discussion_r1569103206


##
core/src/main/scala/kafka/zk/KafkaZkClient.scala:
##
@@ -467,13 +470,33 @@ class KafkaZkClient private[zk] (zooKeeperClient: 
ZooKeeperClient, isSecure: Boo
* @param rootEntityType entity type
* @param sanitizedEntityName entity name
* @throws KeeperException if there is an error while setting or creating 
the znode
+   * @throws ControllerMovedException if no controller is defined, or a KRaft 
controller is defined
*/
   def setOrCreateEntityConfigs(rootEntityType: String, sanitizedEntityName: 
String, config: Properties): Unit = {
+val controllerRegistration = getControllerRegistration match {
+  case Some(registration) => registration
+  case None =>
+// This case is mainly here to make tests less flaky. In practice, 
there will always be a /controller ZNode

Review Comment:
   I found that the integration test would fail occasionally because the config 
change would happen before an active controller was seen (so zkVersion was 0). 
Adding this exception allows the test to retry to alter config call.
   
   In production, this could only be hit if someone was actively calling alter 
configs as the cluster was being deployed and the call was handled in between 
the time that the first broker came up and when the controller became active. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16539 Fix IncrementalAlterConfigs during ZK migration [kafka]

2024-04-17 Thread via GitHub


johnnychhsu commented on code in PR #15744:
URL: https://github.com/apache/kafka/pull/15744#discussion_r1569005655


##
core/src/main/scala/kafka/zk/KafkaZkClient.scala:
##
@@ -467,13 +470,33 @@ class KafkaZkClient private[zk] (zooKeeperClient: 
ZooKeeperClient, isSecure: Boo
* @param rootEntityType entity type
* @param sanitizedEntityName entity name
* @throws KeeperException if there is an error while setting or creating 
the znode
+   * @throws ControllerMovedException if no controller is defined, or a KRaft 
controller is defined
*/
   def setOrCreateEntityConfigs(rootEntityType: String, sanitizedEntityName: 
String, config: Properties): Unit = {
+val controllerRegistration = getControllerRegistration match {
+  case Some(registration) => registration
+  case None =>
+// This case is mainly here to make tests less flaky. In practice, 
there will always be a /controller ZNode

Review Comment:
   may I know why without this could make the test flaky?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16539 Fix IncrementalAlterConfigs during ZK migration [kafka]

2024-04-17 Thread via GitHub


mumrah opened a new pull request, #15744:
URL: https://github.com/apache/kafka/pull/15744

   This patch fixes two issues with IncrementalAlterConfigs and the ZK 
migration. First, it changes the handling of IncrementalAlterConfigs to check 
if the controller is ZK vs KRaft and only forward for KRaft. Second, it adds a 
check in KafkaZkClient#setOrCreateEntityConfigs to ensure a ZK broker is not 
directly modifying configs in ZK if there is a KRaft controller. This closes 
the race condition between KRaft taking over as the active controller and the 
ZK brokers learning about this.
   
   
   
   During the ZK migration, there is a time when the ZK brokers are running 
with migrations enabled, but KRaft has yet to take over as the controller.  
Prior to KRaft taking over as the controller, the ZK brokers in migration mode 
were unconditionally forwarding IncrementalAlterConfigs (IAC) to the ZK 
controller. This works for some config types, but breaks when setting BROKER 
and BROKER_LOGGER configs for a specific broker. The behavior in KafkaApis for 
IAC was to always forward if the forwarding manager was defined. Since ZK 
brokers in migration mode have forwarding enabled, the forwarding would happen, 
and the special logic for BROKER and BROKER_LOGGER would be missed, causing the 
request to fail.
   
   With this fix, the IAC handler will check if the controller is KRaft or ZK 
and only forward for KRaft.
   
    
   
   As part of KIP-500, we moved most (but not all) ZK mutations to the ZK 
controller. One of the things we did not move fully to the controller was 
entity configs. This is because there was some special logic that needed to run 
on the broker for certain config updates. If a broker-specific config was set, 
AdminClient would route the request to the proper broker. In KRaft, we have a 
different mechanism for handling broker-specific config updates.
   
   Leaving this ZK update on the broker side would be okay if we were guarding 
writes on the controller epoch, but it turns out 
KafkaZkClient#setOrCreateEntityConfigs does unprotected "last writer wins" 
updates to ZK. This means a ZK broker could update the contents of ZK _after_ 
the metadata had been migrated to KRaft. No good! To fix this, this patch adds 
a check on the controller epoch to KafkaZkClient#setOrCreateEntityConfigs but 
also adds logic to fail the update if the controller is a KRaft controller.
   
   The new logic in setOrCreateEntityConfigs adds STALE_CONTROLLER_EPOCH as a 
new exception that can be thrown while updating configs.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org