This is an automated email from the ASF dual-hosted git repository.

edocomar pushed a commit to branch 3.9
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.9 by this push:
     new 85a2d1a6fd5 KAFKA-19026: AlterConfigPolicy incompatibility between ZK 
mode and KR… (#19263)
85a2d1a6fd5 is described below

commit 85a2d1a6fd51935cb8a736fe55f24fde394ca91f
Author: Edoardo Comar <[email protected]>
AuthorDate: Tue Dec 23 09:38:40 2025 +0000

    KAFKA-19026: AlterConfigPolicy incompatibility between ZK mode and KR… 
(#19263)
    
    …aft mode when using AlterConfigOp.OpType.SUBTRACT
    
    Modified ZkAdminManager.scala so that on OpType.SUBTRACT the policy
    receives the modified configs, as happens in KRaft mode.
    
    This similarly fixes the OpType.APPEND differences.
    
    Note that the policy behavior on OpType.DELETE is different when
    altering Broker and Topic resources. For topics the policy does not see
    a map entry, for brokers the config value is null. This was the existing
    behavior for KRaft and this commit does not change that.
    
    ClusterTest added.
---
 .../main/scala/kafka/server/ZkAdminManager.scala   |  15 +-
 .../kafka/server/AlterConfigPolicyConfigsTest.java | 295 +++++++++++++++++++++
 .../kafka/server/config/ServerLogConfigs.java      |   6 +
 .../kafka/storage/internals/log/LogConfig.java     |   1 +
 4 files changed, 313 insertions(+), 4 deletions(-)

diff --git a/core/src/main/scala/kafka/server/ZkAdminManager.scala 
b/core/src/main/scala/kafka/server/ZkAdminManager.scala
index dc4c5d06522..a41d71bb2ec 100644
--- a/core/src/main/scala/kafka/server/ZkAdminManager.scala
+++ b/core/src/main/scala/kafka/server/ZkAdminManager.scala
@@ -48,8 +48,7 @@ import 
org.apache.kafka.common.security.scram.internals.{ScramCredentialUtils, S
 import org.apache.kafka.common.utils.Sanitizer
 import org.apache.kafka.server.common.AdminOperationException
 import org.apache.kafka.server.config.{ConfigType, QuotaConfigs, 
ZooKeeperInternals}
-import 
org.apache.kafka.server.config.ServerLogConfigs.CREATE_TOPIC_POLICY_CLASS_NAME_CONFIG
-import 
org.apache.kafka.server.config.ServerLogConfigs.ALTER_CONFIG_POLICY_CLASS_NAME_CONFIG
+import 
org.apache.kafka.server.config.ServerLogConfigs.{ALTER_CONFIG_POLICY_CLASS_NAME_CONFIG,
 ALTER_CONFIG_POLICY_KRAFT_COMPATIBILITY_ENABLE_CONFIG, 
CREATE_TOPIC_POLICY_CLASS_NAME_CONFIG}
 import org.apache.kafka.storage.internals.log.LogConfig
 
 import scala.collection.{Map, mutable, _}
@@ -84,6 +83,8 @@ class ZkAdminManager(val config: KafkaConfig,
 
   private val alterConfigPolicy =
     Option(config.getConfiguredInstance(ALTER_CONFIG_POLICY_CLASS_NAME_CONFIG, 
classOf[AlterConfigPolicy]))
+  // KIP-1252 compatibility flag
+  private val alterConfigPolicyKraftCompatibilityEnabled = 
config.getBoolean(ALTER_CONFIG_POLICY_KRAFT_COMPATIBILITY_ENABLE_CONFIG)
 
   def hasDelayedTopicOperations: Boolean = topicPurgatory.numDelayed != 0
 
@@ -503,8 +504,6 @@ class ZkAdminManager(val config: KafkaConfig,
   def incrementalAlterConfigs(configs: Map[ConfigResource, 
Seq[AlterConfigOp]], validateOnly: Boolean): Map[ConfigResource, ApiError] = {
     configs.map { case (resource, alterConfigOps) =>
       try {
-        val configEntriesMap = alterConfigOps.map(entry => 
(entry.configEntry.name, entry.configEntry.value)).toMap
-
         resource.`type` match {
           case ConfigResource.Type.TOPIC =>
             if (resource.name.isEmpty) {
@@ -512,6 +511,10 @@ class ZkAdminManager(val config: KafkaConfig,
             }
             val configProps = 
adminZkClient.fetchEntityConfig(ConfigType.TOPIC, resource.name)
             prepareIncrementalConfigs(alterConfigOps, configProps, 
LogConfig.configKeys.asScala)
+            val configEntriesMap = 
if(alterConfigPolicyKraftCompatibilityEnabled)
+              alterConfigOps.map(entry => (entry.configEntry.name, 
configProps.getProperty(entry.configEntry.name))).filter(x => x._2 != 
null).toMap
+            else
+              alterConfigOps.map(entry => (entry.configEntry.name, 
entry.configEntry.value)).toMap
             alterTopicConfigs(resource, validateOnly, configProps, 
configEntriesMap)
 
           case ConfigResource.Type.BROKER =>
@@ -523,6 +526,10 @@ class ZkAdminManager(val config: KafkaConfig,
 
             val configProps = 
this.config.dynamicConfig.fromPersistentProps(persistentProps, perBrokerConfig)
             prepareIncrementalConfigs(alterConfigOps, configProps, 
KafkaConfig.configKeys)
+            val configEntriesMap = 
if(alterConfigPolicyKraftCompatibilityEnabled)
+              alterConfigOps.map(entry => (entry.configEntry.name, 
configProps.getProperty(entry.configEntry.name))).toMap
+            else
+              alterConfigOps.map(entry => (entry.configEntry.name, 
entry.configEntry.value)).toMap
             alterBrokerConfigs(resource, validateOnly, configProps, 
configEntriesMap)
 
           case resourceType =>
diff --git a/core/src/test/java/kafka/server/AlterConfigPolicyConfigsTest.java 
b/core/src/test/java/kafka/server/AlterConfigPolicyConfigsTest.java
new file mode 100644
index 00000000000..3ec093dfc7b
--- /dev/null
+++ b/core/src/test/java/kafka/server/AlterConfigPolicyConfigsTest.java
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server;
+
+import kafka.test.ClusterInstance;
+import kafka.test.annotation.ClusterConfigProperty;
+import kafka.test.annotation.ClusterTest;
+import kafka.test.annotation.ClusterTestDefaults;
+import kafka.test.annotation.Type;
+import kafka.test.junit.ClusterTestExtensions;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AlterConfigOp;
+import org.apache.kafka.clients.admin.ConfigEntry;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.errors.PolicyViolationException;
+import org.apache.kafka.server.policy.AlterConfigPolicy;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+
+import static 
org.apache.kafka.server.config.ServerLogConfigs.ALTER_CONFIG_POLICY_CLASS_NAME_CONFIG;
+import static 
org.apache.kafka.server.config.ServerLogConfigs.ALTER_CONFIG_POLICY_KRAFT_COMPATIBILITY_ENABLE_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@ClusterTestDefaults(serverProperties = {
+        @ClusterConfigProperty(key = ALTER_CONFIG_POLICY_CLASS_NAME_CONFIG, 
value = "kafka.server.AlterConfigPolicyConfigsTest$Policy"),
+})
+@ExtendWith(value = ClusterTestExtensions.class)
+public class AlterConfigPolicyConfigsTest {
+
+    @BeforeEach
+    public void setUp() {
+        Policy.lastConfig = null;
+    }
+
+    @ClusterTest(
+            types = {Type.ZK},
+            serverProperties = {
+                    @ClusterConfigProperty(key = 
ALTER_CONFIG_POLICY_KRAFT_COMPATIBILITY_ENABLE_CONFIG, value = "true"),
+            })
+    public void 
testPolicyAlterBrokerConfigSubtractCompatibityEnabled(ClusterInstance 
clusterInstance) throws Exception {
+        testPolicyAlterBrokerConfigSubtract(clusterInstance, true);
+    }
+
+    @ClusterTest
+    public void 
testPolicyAlterBrokerConfigSubtractCompatibityDisabled(ClusterInstance 
clusterInstance) throws Exception {
+        testPolicyAlterBrokerConfigSubtract(clusterInstance, false);
+    }
+
+    public void testPolicyAlterBrokerConfigSubtract(ClusterInstance 
clusterInstance, boolean compatibilityMode) throws Exception {
+        try (Admin admin = clusterInstance.createAdminClient()) {
+            clusterInstance.waitForReadyBrokers();
+
+            AlterConfigOp alterConfigOp = new AlterConfigOp(
+                    new ConfigEntry(SslConfigs.SSL_CIPHER_SUITES_CONFIG, 
"foo"),
+                    AlterConfigOp.OpType.SUBTRACT);
+            Map<ConfigResource, Collection<AlterConfigOp>> alterConfigs = 
Collections.singletonMap(
+                    new ConfigResource(ConfigResource.Type.BROKER, "0"),
+                    Collections.singletonList(alterConfigOp));
+            admin.incrementalAlterConfigs(alterConfigs).all().get();
+
+            if (clusterInstance.isKRaftTest() || compatibilityMode) {
+                assertEquals("", 
Policy.lastConfig.get(SslConfigs.SSL_CIPHER_SUITES_CONFIG));
+            } else {
+                assertEquals("foo", 
Policy.lastConfig.get(SslConfigs.SSL_CIPHER_SUITES_CONFIG));
+            }
+        }
+    }
+
+    @ClusterTest
+    public void testPolicyAlterBrokerConfigAppend(ClusterInstance 
clusterInstance) throws Exception {
+        try (Admin admin = clusterInstance.createAdminClient()) {
+            clusterInstance.waitForReadyBrokers();
+
+            AlterConfigOp alterConfigOp = new AlterConfigOp(
+                    new ConfigEntry(SslConfigs.SSL_CIPHER_SUITES_CONFIG, 
"foo,bar"),
+                    AlterConfigOp.OpType.APPEND);
+            Map<ConfigResource, Collection<AlterConfigOp>> alterConfigs = 
Collections.singletonMap(
+                    new ConfigResource(ConfigResource.Type.BROKER, "0"),
+                    Collections.singletonList(alterConfigOp));
+            admin.incrementalAlterConfigs(alterConfigs).all().get();
+            assertEquals("foo,bar", 
Policy.lastConfig.get(SslConfigs.SSL_CIPHER_SUITES_CONFIG));
+        }
+    }
+
+    @ClusterTest
+    public void testPolicyAlterBrokerConfigSet(ClusterInstance 
clusterInstance) throws Exception {
+        try (Admin admin = clusterInstance.createAdminClient()) {
+            clusterInstance.waitForReadyBrokers();
+
+            AlterConfigOp alterConfigOp = new AlterConfigOp(
+                    new ConfigEntry(SslConfigs.SSL_CIPHER_SUITES_CONFIG, 
"foo"),
+                    AlterConfigOp.OpType.SET);
+            Map<ConfigResource, Collection<AlterConfigOp>> alterConfigs = 
Collections.singletonMap(
+                    new ConfigResource(ConfigResource.Type.BROKER, "0"),
+                    Collections.singletonList(alterConfigOp));
+            admin.incrementalAlterConfigs(alterConfigs).all().get();
+            assertEquals("foo", 
Policy.lastConfig.get(SslConfigs.SSL_CIPHER_SUITES_CONFIG));
+        }
+    }
+
+    @ClusterTest(
+            types = {Type.ZK},
+            serverProperties = {
+                    @ClusterConfigProperty(key = 
ALTER_CONFIG_POLICY_KRAFT_COMPATIBILITY_ENABLE_CONFIG, value = "true"),
+            })
+    public void 
testPolicyAlterBrokerConfigDeleteCompatibityEnabled(ClusterInstance 
clusterInstance) throws Exception {
+        testPolicyAlterBrokerConfigDelete(clusterInstance, true);
+    }
+
+    @ClusterTest
+    public void 
testPolicyAlterBrokerConfigDeleteCompatibityDisabled(ClusterInstance 
clusterInstance) throws Exception {
+        testPolicyAlterBrokerConfigDelete(clusterInstance, false);
+    }
+
+    public void testPolicyAlterBrokerConfigDelete(ClusterInstance 
clusterInstance, boolean compatibilityMode) throws Exception {
+        try (Admin admin = clusterInstance.createAdminClient()) {
+            clusterInstance.waitForReadyBrokers();
+
+            AlterConfigOp alterConfigOp = new AlterConfigOp(
+                    new ConfigEntry(SslConfigs.SSL_CIPHER_SUITES_CONFIG, 
"unused"),
+                    AlterConfigOp.OpType.DELETE);
+            Map<ConfigResource, Collection<AlterConfigOp>> alterConfigs = 
Collections.singletonMap(
+                    new ConfigResource(ConfigResource.Type.BROKER, "0"),
+                    Collections.singletonList(alterConfigOp));
+            admin.incrementalAlterConfigs(alterConfigs).all().get();
+            
assertTrue(Policy.lastConfig.containsKey(SslConfigs.SSL_CIPHER_SUITES_CONFIG));
+
+            if (clusterInstance.isKRaftTest() || compatibilityMode) {
+                
assertNull(Policy.lastConfig.get(SslConfigs.SSL_CIPHER_SUITES_CONFIG));
+            } else {
+                assertEquals("unused", 
Policy.lastConfig.get(SslConfigs.SSL_CIPHER_SUITES_CONFIG));
+            }
+        }
+    }
+
+    @ClusterTest(
+            types = {Type.ZK},
+            serverProperties = {
+                    @ClusterConfigProperty(key = 
ALTER_CONFIG_POLICY_KRAFT_COMPATIBILITY_ENABLE_CONFIG, value = "true"),
+            })
+    public void 
testPolicyAlterTopicConfigSubtractCompatibityEnabled(ClusterInstance 
clusterInstance) throws Exception {
+        testPolicyAlterTopicConfigSubtract(clusterInstance, true);
+    }
+
+    @ClusterTest
+    public void 
testPolicyAlterTopicConfigSubtractCompatibityDisabled(ClusterInstance 
clusterInstance) throws Exception {
+        testPolicyAlterTopicConfigSubtract(clusterInstance, false);
+    }
+
+    public void testPolicyAlterTopicConfigSubtract(ClusterInstance 
clusterInstance, boolean compatibilityMode) throws Exception {
+        try (Admin admin = clusterInstance.createAdminClient()) {
+            admin.createTopics(Collections.singleton(new NewTopic("topic1", 1, 
(short) 1))).all().get();
+            clusterInstance.waitForTopic("topic1", 1);
+
+            AlterConfigOp alterConfigOp = new AlterConfigOp(
+                    new ConfigEntry(TopicConfig.CLEANUP_POLICY_CONFIG, "foo"),
+                    AlterConfigOp.OpType.SUBTRACT);
+            Map<ConfigResource, Collection<AlterConfigOp>> alterConfigs = 
Collections.singletonMap(
+                    new ConfigResource(ConfigResource.Type.TOPIC, "topic1"),
+                    Collections.singletonList(alterConfigOp));
+            admin.incrementalAlterConfigs(alterConfigs).all().get();
+            if (clusterInstance.isKRaftTest() || compatibilityMode) {
+                assertEquals("delete", 
Policy.lastConfig.get(TopicConfig.CLEANUP_POLICY_CONFIG));
+            } else {
+                assertEquals("foo", 
Policy.lastConfig.get(TopicConfig.CLEANUP_POLICY_CONFIG));
+            }
+        }
+    }
+
+    @ClusterTest(
+            types = {Type.ZK},
+            serverProperties = {
+                    @ClusterConfigProperty(key = 
ALTER_CONFIG_POLICY_KRAFT_COMPATIBILITY_ENABLE_CONFIG, value = "true"),
+            })
+    public void 
testPolicyAlterTopicConfigAppendCompatibityEnabled(ClusterInstance 
clusterInstance) throws Exception {
+        testPolicyAlterTopicConfigAppend(clusterInstance, true);
+    }
+
+    @ClusterTest
+    public void 
testPolicyAlterTopicConfigAppendCompatibityDisabled(ClusterInstance 
clusterInstance) throws Exception {
+        testPolicyAlterTopicConfigAppend(clusterInstance, false);
+    }
+
+    public void testPolicyAlterTopicConfigAppend(ClusterInstance 
clusterInstance, boolean compatibilityMode) throws Exception {
+        try (Admin admin = clusterInstance.createAdminClient()) {
+            admin.createTopics(Collections.singleton(new NewTopic("topic1", 1, 
(short) 1))).all().get();
+            clusterInstance.waitForTopic("topic1", 1);
+
+            AlterConfigOp alterConfigOp = new AlterConfigOp(
+                    new ConfigEntry(TopicConfig.CLEANUP_POLICY_CONFIG, 
"compact"),
+                    AlterConfigOp.OpType.APPEND);
+            Map<ConfigResource, Collection<AlterConfigOp>> alterConfigs = 
Collections.singletonMap(
+                    new ConfigResource(ConfigResource.Type.TOPIC, "topic1"),
+                    Collections.singletonList(alterConfigOp));
+            admin.incrementalAlterConfigs(alterConfigs).all().get();
+            if (clusterInstance.isKRaftTest() || compatibilityMode) {
+                assertEquals("delete,compact", 
Policy.lastConfig.get(TopicConfig.CLEANUP_POLICY_CONFIG));
+            } else {
+                assertEquals("compact", 
Policy.lastConfig.get(TopicConfig.CLEANUP_POLICY_CONFIG));
+            }
+        }
+    }
+
+    @ClusterTest
+    public void testPolicyAlterTopicConfigSet(ClusterInstance clusterInstance) 
throws Exception {
+        try (Admin admin = clusterInstance.createAdminClient()) {
+            admin.createTopics(Collections.singleton(new NewTopic("topic1", 1, 
(short) 1))).all().get();
+            clusterInstance.waitForTopic("topic1", 1);
+
+            AlterConfigOp alterConfigOp = new AlterConfigOp(
+                    new ConfigEntry(TopicConfig.CLEANUP_POLICY_CONFIG, 
"compact"),
+                    AlterConfigOp.OpType.SET);
+            Map<ConfigResource, Collection<AlterConfigOp>> alterConfigs = 
Collections.singletonMap(
+                    new ConfigResource(ConfigResource.Type.TOPIC, "topic1"),
+                    Collections.singletonList(alterConfigOp));
+            admin.incrementalAlterConfigs(alterConfigs).all().get();
+            assertEquals("compact", 
Policy.lastConfig.get(TopicConfig.CLEANUP_POLICY_CONFIG));
+        }
+    }
+
+    @ClusterTest(
+            types = {Type.ZK},
+            serverProperties = {
+                    @ClusterConfigProperty(key = 
ALTER_CONFIG_POLICY_KRAFT_COMPATIBILITY_ENABLE_CONFIG, value = "true"),
+            })
+    public void 
testPolicyAlterTopicConfigDeleteCompatibityEnabled(ClusterInstance 
clusterInstance) throws Exception {
+        testPolicyAlterTopicConfigDelete(clusterInstance, true);
+    }
+
+    @ClusterTest
+    public void 
testPolicyAlterTopicConfigDeleteCompatibityDisabled(ClusterInstance 
clusterInstance) throws Exception {
+        testPolicyAlterTopicConfigDelete(clusterInstance, false);
+    }
+
+    public void testPolicyAlterTopicConfigDelete(ClusterInstance 
clusterInstance, boolean compatibilityMode) throws Exception {
+        try (Admin admin = clusterInstance.createAdminClient()) {
+            admin.createTopics(Collections.singleton(new NewTopic("topic1", 1, 
(short) 1))).all().get();
+            clusterInstance.waitForTopic("topic1", 1);
+
+            AlterConfigOp alterConfigOp = new AlterConfigOp(
+                    new ConfigEntry(TopicConfig.CLEANUP_POLICY_CONFIG, 
"unused"),
+                    AlterConfigOp.OpType.DELETE);
+            Map<ConfigResource, Collection<AlterConfigOp>> alterConfigs = 
Collections.singletonMap(
+                    new ConfigResource(ConfigResource.Type.TOPIC, "topic1"),
+                    Collections.singletonList(alterConfigOp));
+            admin.incrementalAlterConfigs(alterConfigs).all().get();
+            if (clusterInstance.isKRaftTest() || compatibilityMode) {
+                
assertFalse(Policy.lastConfig.containsKey(TopicConfig.CLEANUP_POLICY_CONFIG));
+            } else {
+                
assertTrue(Policy.lastConfig.containsKey(TopicConfig.CLEANUP_POLICY_CONFIG));
+            }
+        }
+    }
+
+    public static class Policy implements AlterConfigPolicy {
+        public static Map<String, String> lastConfig;
+
+        @Override
+        public void validate(AlterConfigPolicy.RequestMetadata 
requestMetadata) throws PolicyViolationException {
+            assertNull(lastConfig);
+            lastConfig = requestMetadata.configs();
+        }
+
+        @Override
+        public void close() throws Exception {}
+        @Override
+        public void configure(Map<String, ?> configs) {}
+    }
+}
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/config/ServerLogConfigs.java
 
b/server-common/src/main/java/org/apache/kafka/server/config/ServerLogConfigs.java
index c6e0810262c..142ac96163a 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/config/ServerLogConfigs.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/config/ServerLogConfigs.java
@@ -178,6 +178,12 @@ public class ServerLogConfigs {
     public static final String ALTER_CONFIG_POLICY_CLASS_NAME_CONFIG = 
"alter.config.policy.class.name";
     public static final String ALTER_CONFIG_POLICY_CLASS_NAME_DOC = "The alter 
configs policy class that should be used for validation. The class should " +
             "implement the 
<code>org.apache.kafka.server.policy.AlterConfigPolicy</code> interface.";
+    public static final String 
ALTER_CONFIG_POLICY_KRAFT_COMPATIBILITY_ENABLE_CONFIG = 
"alter.config.policy.kraft.compatibility.enable";
+    public static final boolean 
ALTER_CONFIG_POLICY_KRAFT_COMPATIBILITY_ENABLE_DEFAULT = false;
+    public static final String 
ALTER_CONFIG_POLICY_KRAFT_COMPATIBILITY_ENABLE_DOC = "This configuration 
controls whether " +
+            "for incremental alter config operations of type SUBTRACT or 
DELETE on a config entry of type LIST, " +
+            "an incremental alter config policy will invoked consistently 
regardless of whether Kafka is running in KRaft or Zookeeper mode.";
+
     public static final String LOG_MESSAGE_DOWNCONVERSION_ENABLE_CONFIG = 
ServerTopicConfigSynonyms.serverSynonym(TopicConfig.MESSAGE_DOWNCONVERSION_ENABLE_CONFIG);
     public static final boolean LOG_MESSAGE_DOWNCONVERSION_ENABLE_DEFAULT = 
true;
     public static final String LOG_MESSAGE_DOWNCONVERSION_ENABLE_DOC = "This 
configuration controls whether " +
diff --git 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
index 3299b6e89d7..695b8d51e4f 100644
--- 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
+++ 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
@@ -255,6 +255,7 @@ public class LogConfig extends AbstractConfig {
             
.define(ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG, LONG, 
ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_AFTER_MAX_MS_DEFAULT, atLeast(0), 
MEDIUM, ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_AFTER_MAX_MS_DOC)
             .define(ServerLogConfigs.CREATE_TOPIC_POLICY_CLASS_NAME_CONFIG, 
CLASS, null, LOW, ServerLogConfigs.CREATE_TOPIC_POLICY_CLASS_NAME_DOC)
             .define(ServerLogConfigs.ALTER_CONFIG_POLICY_CLASS_NAME_CONFIG, 
CLASS, null, LOW, ServerLogConfigs.ALTER_CONFIG_POLICY_CLASS_NAME_DOC)
+            
.define(ServerLogConfigs.ALTER_CONFIG_POLICY_KRAFT_COMPATIBILITY_ENABLE_CONFIG, 
BOOLEAN, 
ServerLogConfigs.ALTER_CONFIG_POLICY_KRAFT_COMPATIBILITY_ENABLE_DEFAULT, LOW, 
ServerLogConfigs.ALTER_CONFIG_POLICY_KRAFT_COMPATIBILITY_ENABLE_DOC)
             .define(ServerLogConfigs.LOG_MESSAGE_DOWNCONVERSION_ENABLE_CONFIG, 
BOOLEAN, ServerLogConfigs.LOG_MESSAGE_DOWNCONVERSION_ENABLE_DEFAULT, LOW, 
ServerLogConfigs.LOG_MESSAGE_DOWNCONVERSION_ENABLE_DOC)
             .define(ServerLogConfigs.LOG_DIR_FAILURE_TIMEOUT_MS_CONFIG, LONG, 
ServerLogConfigs.LOG_DIR_FAILURE_TIMEOUT_MS_DEFAULT, atLeast(1), LOW, 
ServerLogConfigs.LOG_DIR_FAILURE_TIMEOUT_MS_DOC)
             .defineInternal(ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_CONFIG, 
LONG, ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_DEFAULT, atLeast(0), LOW, 
ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_DOC);

Reply via email to