This is an automated email from the ASF dual-hosted git repository.
edocomar pushed a commit to branch 3.9
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.9 by this push:
new 85a2d1a6fd5 KAFKA-19026: AlterConfigPolicy incompatibility between ZK
mode and KR… (#19263)
85a2d1a6fd5 is described below
commit 85a2d1a6fd51935cb8a736fe55f24fde394ca91f
Author: Edoardo Comar <[email protected]>
AuthorDate: Tue Dec 23 09:38:40 2025 +0000
KAFKA-19026: AlterConfigPolicy incompatibility between ZK mode and KR…
(#19263)
…aft mode when using AlterConfigOp.OpType.SUBTRACT
Modified ZkAdminManager.scala so that on OpType.SUBTRACT the policy
receives the modified configs, as happens in KRaft mode.
This similarly fixes the OpType.APPEND differences.
Note that the policy behavior on OpType.DELETE is different when
altering Broker and Topic resources. For topics the policy does not see
a map entry, for brokers the config value is null. This was the existing
behavior for KRaft and this commit does not change that.
ClusterTest added.
---
.../main/scala/kafka/server/ZkAdminManager.scala | 15 +-
.../kafka/server/AlterConfigPolicyConfigsTest.java | 295 +++++++++++++++++++++
.../kafka/server/config/ServerLogConfigs.java | 6 +
.../kafka/storage/internals/log/LogConfig.java | 1 +
4 files changed, 313 insertions(+), 4 deletions(-)
diff --git a/core/src/main/scala/kafka/server/ZkAdminManager.scala
b/core/src/main/scala/kafka/server/ZkAdminManager.scala
index dc4c5d06522..a41d71bb2ec 100644
--- a/core/src/main/scala/kafka/server/ZkAdminManager.scala
+++ b/core/src/main/scala/kafka/server/ZkAdminManager.scala
@@ -48,8 +48,7 @@ import
org.apache.kafka.common.security.scram.internals.{ScramCredentialUtils, S
import org.apache.kafka.common.utils.Sanitizer
import org.apache.kafka.server.common.AdminOperationException
import org.apache.kafka.server.config.{ConfigType, QuotaConfigs,
ZooKeeperInternals}
-import
org.apache.kafka.server.config.ServerLogConfigs.CREATE_TOPIC_POLICY_CLASS_NAME_CONFIG
-import
org.apache.kafka.server.config.ServerLogConfigs.ALTER_CONFIG_POLICY_CLASS_NAME_CONFIG
+import
org.apache.kafka.server.config.ServerLogConfigs.{ALTER_CONFIG_POLICY_CLASS_NAME_CONFIG,
ALTER_CONFIG_POLICY_KRAFT_COMPATIBILITY_ENABLE_CONFIG,
CREATE_TOPIC_POLICY_CLASS_NAME_CONFIG}
import org.apache.kafka.storage.internals.log.LogConfig
import scala.collection.{Map, mutable, _}
@@ -84,6 +83,8 @@ class ZkAdminManager(val config: KafkaConfig,
private val alterConfigPolicy =
Option(config.getConfiguredInstance(ALTER_CONFIG_POLICY_CLASS_NAME_CONFIG,
classOf[AlterConfigPolicy]))
+ // KIP-1252 compatibility flag
+ private val alterConfigPolicyKraftCompatibilityEnabled =
config.getBoolean(ALTER_CONFIG_POLICY_KRAFT_COMPATIBILITY_ENABLE_CONFIG)
def hasDelayedTopicOperations: Boolean = topicPurgatory.numDelayed != 0
@@ -503,8 +504,6 @@ class ZkAdminManager(val config: KafkaConfig,
def incrementalAlterConfigs(configs: Map[ConfigResource,
Seq[AlterConfigOp]], validateOnly: Boolean): Map[ConfigResource, ApiError] = {
configs.map { case (resource, alterConfigOps) =>
try {
- val configEntriesMap = alterConfigOps.map(entry =>
(entry.configEntry.name, entry.configEntry.value)).toMap
-
resource.`type` match {
case ConfigResource.Type.TOPIC =>
if (resource.name.isEmpty) {
@@ -512,6 +511,10 @@ class ZkAdminManager(val config: KafkaConfig,
}
val configProps =
adminZkClient.fetchEntityConfig(ConfigType.TOPIC, resource.name)
prepareIncrementalConfigs(alterConfigOps, configProps,
LogConfig.configKeys.asScala)
+ val configEntriesMap =
if(alterConfigPolicyKraftCompatibilityEnabled)
+ alterConfigOps.map(entry => (entry.configEntry.name,
configProps.getProperty(entry.configEntry.name))).filter(x => x._2 !=
null).toMap
+ else
+ alterConfigOps.map(entry => (entry.configEntry.name,
entry.configEntry.value)).toMap
alterTopicConfigs(resource, validateOnly, configProps,
configEntriesMap)
case ConfigResource.Type.BROKER =>
@@ -523,6 +526,10 @@ class ZkAdminManager(val config: KafkaConfig,
val configProps =
this.config.dynamicConfig.fromPersistentProps(persistentProps, perBrokerConfig)
prepareIncrementalConfigs(alterConfigOps, configProps,
KafkaConfig.configKeys)
+ val configEntriesMap =
if(alterConfigPolicyKraftCompatibilityEnabled)
+ alterConfigOps.map(entry => (entry.configEntry.name,
configProps.getProperty(entry.configEntry.name))).toMap
+ else
+ alterConfigOps.map(entry => (entry.configEntry.name,
entry.configEntry.value)).toMap
alterBrokerConfigs(resource, validateOnly, configProps,
configEntriesMap)
case resourceType =>
diff --git a/core/src/test/java/kafka/server/AlterConfigPolicyConfigsTest.java
b/core/src/test/java/kafka/server/AlterConfigPolicyConfigsTest.java
new file mode 100644
index 00000000000..3ec093dfc7b
--- /dev/null
+++ b/core/src/test/java/kafka/server/AlterConfigPolicyConfigsTest.java
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server;
+
+import kafka.test.ClusterInstance;
+import kafka.test.annotation.ClusterConfigProperty;
+import kafka.test.annotation.ClusterTest;
+import kafka.test.annotation.ClusterTestDefaults;
+import kafka.test.annotation.Type;
+import kafka.test.junit.ClusterTestExtensions;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AlterConfigOp;
+import org.apache.kafka.clients.admin.ConfigEntry;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.errors.PolicyViolationException;
+import org.apache.kafka.server.policy.AlterConfigPolicy;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+
+import static
org.apache.kafka.server.config.ServerLogConfigs.ALTER_CONFIG_POLICY_CLASS_NAME_CONFIG;
+import static
org.apache.kafka.server.config.ServerLogConfigs.ALTER_CONFIG_POLICY_KRAFT_COMPATIBILITY_ENABLE_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@ClusterTestDefaults(serverProperties = {
+ @ClusterConfigProperty(key = ALTER_CONFIG_POLICY_CLASS_NAME_CONFIG,
value = "kafka.server.AlterConfigPolicyConfigsTest$Policy"),
+})
+@ExtendWith(value = ClusterTestExtensions.class)
+public class AlterConfigPolicyConfigsTest {
+
+ @BeforeEach
+ public void setUp() {
+ Policy.lastConfig = null;
+ }
+
+ @ClusterTest(
+ types = {Type.ZK},
+ serverProperties = {
+ @ClusterConfigProperty(key =
ALTER_CONFIG_POLICY_KRAFT_COMPATIBILITY_ENABLE_CONFIG, value = "true"),
+ })
+ public void
testPolicyAlterBrokerConfigSubtractCompatibityEnabled(ClusterInstance
clusterInstance) throws Exception {
+ testPolicyAlterBrokerConfigSubtract(clusterInstance, true);
+ }
+
+ @ClusterTest
+ public void
testPolicyAlterBrokerConfigSubtractCompatibityDisabled(ClusterInstance
clusterInstance) throws Exception {
+ testPolicyAlterBrokerConfigSubtract(clusterInstance, false);
+ }
+
+ public void testPolicyAlterBrokerConfigSubtract(ClusterInstance
clusterInstance, boolean compatibilityMode) throws Exception {
+ try (Admin admin = clusterInstance.createAdminClient()) {
+ clusterInstance.waitForReadyBrokers();
+
+ AlterConfigOp alterConfigOp = new AlterConfigOp(
+ new ConfigEntry(SslConfigs.SSL_CIPHER_SUITES_CONFIG,
"foo"),
+ AlterConfigOp.OpType.SUBTRACT);
+ Map<ConfigResource, Collection<AlterConfigOp>> alterConfigs =
Collections.singletonMap(
+ new ConfigResource(ConfigResource.Type.BROKER, "0"),
+ Collections.singletonList(alterConfigOp));
+ admin.incrementalAlterConfigs(alterConfigs).all().get();
+
+ if (clusterInstance.isKRaftTest() || compatibilityMode) {
+ assertEquals("",
Policy.lastConfig.get(SslConfigs.SSL_CIPHER_SUITES_CONFIG));
+ } else {
+ assertEquals("foo",
Policy.lastConfig.get(SslConfigs.SSL_CIPHER_SUITES_CONFIG));
+ }
+ }
+ }
+
+ @ClusterTest
+ public void testPolicyAlterBrokerConfigAppend(ClusterInstance
clusterInstance) throws Exception {
+ try (Admin admin = clusterInstance.createAdminClient()) {
+ clusterInstance.waitForReadyBrokers();
+
+ AlterConfigOp alterConfigOp = new AlterConfigOp(
+ new ConfigEntry(SslConfigs.SSL_CIPHER_SUITES_CONFIG,
"foo,bar"),
+ AlterConfigOp.OpType.APPEND);
+ Map<ConfigResource, Collection<AlterConfigOp>> alterConfigs =
Collections.singletonMap(
+ new ConfigResource(ConfigResource.Type.BROKER, "0"),
+ Collections.singletonList(alterConfigOp));
+ admin.incrementalAlterConfigs(alterConfigs).all().get();
+ assertEquals("foo,bar",
Policy.lastConfig.get(SslConfigs.SSL_CIPHER_SUITES_CONFIG));
+ }
+ }
+
+ @ClusterTest
+ public void testPolicyAlterBrokerConfigSet(ClusterInstance
clusterInstance) throws Exception {
+ try (Admin admin = clusterInstance.createAdminClient()) {
+ clusterInstance.waitForReadyBrokers();
+
+ AlterConfigOp alterConfigOp = new AlterConfigOp(
+ new ConfigEntry(SslConfigs.SSL_CIPHER_SUITES_CONFIG,
"foo"),
+ AlterConfigOp.OpType.SET);
+ Map<ConfigResource, Collection<AlterConfigOp>> alterConfigs =
Collections.singletonMap(
+ new ConfigResource(ConfigResource.Type.BROKER, "0"),
+ Collections.singletonList(alterConfigOp));
+ admin.incrementalAlterConfigs(alterConfigs).all().get();
+ assertEquals("foo",
Policy.lastConfig.get(SslConfigs.SSL_CIPHER_SUITES_CONFIG));
+ }
+ }
+
+ @ClusterTest(
+ types = {Type.ZK},
+ serverProperties = {
+ @ClusterConfigProperty(key =
ALTER_CONFIG_POLICY_KRAFT_COMPATIBILITY_ENABLE_CONFIG, value = "true"),
+ })
+ public void
testPolicyAlterBrokerConfigDeleteCompatibityEnabled(ClusterInstance
clusterInstance) throws Exception {
+ testPolicyAlterBrokerConfigDelete(clusterInstance, true);
+ }
+
+ @ClusterTest
+ public void
testPolicyAlterBrokerConfigDeleteCompatibityDisabled(ClusterInstance
clusterInstance) throws Exception {
+ testPolicyAlterBrokerConfigDelete(clusterInstance, false);
+ }
+
+ public void testPolicyAlterBrokerConfigDelete(ClusterInstance
clusterInstance, boolean compatibilityMode) throws Exception {
+ try (Admin admin = clusterInstance.createAdminClient()) {
+ clusterInstance.waitForReadyBrokers();
+
+ AlterConfigOp alterConfigOp = new AlterConfigOp(
+ new ConfigEntry(SslConfigs.SSL_CIPHER_SUITES_CONFIG,
"unused"),
+ AlterConfigOp.OpType.DELETE);
+ Map<ConfigResource, Collection<AlterConfigOp>> alterConfigs =
Collections.singletonMap(
+ new ConfigResource(ConfigResource.Type.BROKER, "0"),
+ Collections.singletonList(alterConfigOp));
+ admin.incrementalAlterConfigs(alterConfigs).all().get();
+
assertTrue(Policy.lastConfig.containsKey(SslConfigs.SSL_CIPHER_SUITES_CONFIG));
+
+ if (clusterInstance.isKRaftTest() || compatibilityMode) {
+
assertNull(Policy.lastConfig.get(SslConfigs.SSL_CIPHER_SUITES_CONFIG));
+ } else {
+ assertEquals("unused",
Policy.lastConfig.get(SslConfigs.SSL_CIPHER_SUITES_CONFIG));
+ }
+ }
+ }
+
+ @ClusterTest(
+ types = {Type.ZK},
+ serverProperties = {
+ @ClusterConfigProperty(key =
ALTER_CONFIG_POLICY_KRAFT_COMPATIBILITY_ENABLE_CONFIG, value = "true"),
+ })
+ public void
testPolicyAlterTopicConfigSubtractCompatibityEnabled(ClusterInstance
clusterInstance) throws Exception {
+ testPolicyAlterTopicConfigSubtract(clusterInstance, true);
+ }
+
+ @ClusterTest
+ public void
testPolicyAlterTopicConfigSubtractCompatibityDisabled(ClusterInstance
clusterInstance) throws Exception {
+ testPolicyAlterTopicConfigSubtract(clusterInstance, false);
+ }
+
+ public void testPolicyAlterTopicConfigSubtract(ClusterInstance
clusterInstance, boolean compatibilityMode) throws Exception {
+ try (Admin admin = clusterInstance.createAdminClient()) {
+ admin.createTopics(Collections.singleton(new NewTopic("topic1", 1,
(short) 1))).all().get();
+ clusterInstance.waitForTopic("topic1", 1);
+
+ AlterConfigOp alterConfigOp = new AlterConfigOp(
+ new ConfigEntry(TopicConfig.CLEANUP_POLICY_CONFIG, "foo"),
+ AlterConfigOp.OpType.SUBTRACT);
+ Map<ConfigResource, Collection<AlterConfigOp>> alterConfigs =
Collections.singletonMap(
+ new ConfigResource(ConfigResource.Type.TOPIC, "topic1"),
+ Collections.singletonList(alterConfigOp));
+ admin.incrementalAlterConfigs(alterConfigs).all().get();
+ if (clusterInstance.isKRaftTest() || compatibilityMode) {
+ assertEquals("delete",
Policy.lastConfig.get(TopicConfig.CLEANUP_POLICY_CONFIG));
+ } else {
+ assertEquals("foo",
Policy.lastConfig.get(TopicConfig.CLEANUP_POLICY_CONFIG));
+ }
+ }
+ }
+
+ @ClusterTest(
+ types = {Type.ZK},
+ serverProperties = {
+ @ClusterConfigProperty(key =
ALTER_CONFIG_POLICY_KRAFT_COMPATIBILITY_ENABLE_CONFIG, value = "true"),
+ })
+ public void
testPolicyAlterTopicConfigAppendCompatibityEnabled(ClusterInstance
clusterInstance) throws Exception {
+ testPolicyAlterTopicConfigAppend(clusterInstance, true);
+ }
+
+ @ClusterTest
+ public void
testPolicyAlterTopicConfigAppendCompatibityDisabled(ClusterInstance
clusterInstance) throws Exception {
+ testPolicyAlterTopicConfigAppend(clusterInstance, false);
+ }
+
+ public void testPolicyAlterTopicConfigAppend(ClusterInstance
clusterInstance, boolean compatibilityMode) throws Exception {
+ try (Admin admin = clusterInstance.createAdminClient()) {
+ admin.createTopics(Collections.singleton(new NewTopic("topic1", 1,
(short) 1))).all().get();
+ clusterInstance.waitForTopic("topic1", 1);
+
+ AlterConfigOp alterConfigOp = new AlterConfigOp(
+ new ConfigEntry(TopicConfig.CLEANUP_POLICY_CONFIG,
"compact"),
+ AlterConfigOp.OpType.APPEND);
+ Map<ConfigResource, Collection<AlterConfigOp>> alterConfigs =
Collections.singletonMap(
+ new ConfigResource(ConfigResource.Type.TOPIC, "topic1"),
+ Collections.singletonList(alterConfigOp));
+ admin.incrementalAlterConfigs(alterConfigs).all().get();
+ if (clusterInstance.isKRaftTest() || compatibilityMode) {
+ assertEquals("delete,compact",
Policy.lastConfig.get(TopicConfig.CLEANUP_POLICY_CONFIG));
+ } else {
+ assertEquals("compact",
Policy.lastConfig.get(TopicConfig.CLEANUP_POLICY_CONFIG));
+ }
+ }
+ }
+
+ @ClusterTest
+ public void testPolicyAlterTopicConfigSet(ClusterInstance clusterInstance)
throws Exception {
+ try (Admin admin = clusterInstance.createAdminClient()) {
+ admin.createTopics(Collections.singleton(new NewTopic("topic1", 1,
(short) 1))).all().get();
+ clusterInstance.waitForTopic("topic1", 1);
+
+ AlterConfigOp alterConfigOp = new AlterConfigOp(
+ new ConfigEntry(TopicConfig.CLEANUP_POLICY_CONFIG,
"compact"),
+ AlterConfigOp.OpType.SET);
+ Map<ConfigResource, Collection<AlterConfigOp>> alterConfigs =
Collections.singletonMap(
+ new ConfigResource(ConfigResource.Type.TOPIC, "topic1"),
+ Collections.singletonList(alterConfigOp));
+ admin.incrementalAlterConfigs(alterConfigs).all().get();
+ assertEquals("compact",
Policy.lastConfig.get(TopicConfig.CLEANUP_POLICY_CONFIG));
+ }
+ }
+
+ @ClusterTest(
+ types = {Type.ZK},
+ serverProperties = {
+ @ClusterConfigProperty(key =
ALTER_CONFIG_POLICY_KRAFT_COMPATIBILITY_ENABLE_CONFIG, value = "true"),
+ })
+ public void
testPolicyAlterTopicConfigDeleteCompatibityEnabled(ClusterInstance
clusterInstance) throws Exception {
+ testPolicyAlterTopicConfigDelete(clusterInstance, true);
+ }
+
+ @ClusterTest
+ public void
testPolicyAlterTopicConfigDeleteCompatibityDisabled(ClusterInstance
clusterInstance) throws Exception {
+ testPolicyAlterTopicConfigDelete(clusterInstance, false);
+ }
+
+ public void testPolicyAlterTopicConfigDelete(ClusterInstance
clusterInstance, boolean compatibilityMode) throws Exception {
+ try (Admin admin = clusterInstance.createAdminClient()) {
+ admin.createTopics(Collections.singleton(new NewTopic("topic1", 1,
(short) 1))).all().get();
+ clusterInstance.waitForTopic("topic1", 1);
+
+ AlterConfigOp alterConfigOp = new AlterConfigOp(
+ new ConfigEntry(TopicConfig.CLEANUP_POLICY_CONFIG,
"unused"),
+ AlterConfigOp.OpType.DELETE);
+ Map<ConfigResource, Collection<AlterConfigOp>> alterConfigs =
Collections.singletonMap(
+ new ConfigResource(ConfigResource.Type.TOPIC, "topic1"),
+ Collections.singletonList(alterConfigOp));
+ admin.incrementalAlterConfigs(alterConfigs).all().get();
+ if (clusterInstance.isKRaftTest() || compatibilityMode) {
+
assertFalse(Policy.lastConfig.containsKey(TopicConfig.CLEANUP_POLICY_CONFIG));
+ } else {
+
assertTrue(Policy.lastConfig.containsKey(TopicConfig.CLEANUP_POLICY_CONFIG));
+ }
+ }
+ }
+
+ public static class Policy implements AlterConfigPolicy {
+ public static Map<String, String> lastConfig;
+
+ @Override
+ public void validate(AlterConfigPolicy.RequestMetadata
requestMetadata) throws PolicyViolationException {
+ assertNull(lastConfig);
+ lastConfig = requestMetadata.configs();
+ }
+
+ @Override
+ public void close() throws Exception {}
+ @Override
+ public void configure(Map<String, ?> configs) {}
+ }
+}
diff --git
a/server-common/src/main/java/org/apache/kafka/server/config/ServerLogConfigs.java
b/server-common/src/main/java/org/apache/kafka/server/config/ServerLogConfigs.java
index c6e0810262c..142ac96163a 100644
---
a/server-common/src/main/java/org/apache/kafka/server/config/ServerLogConfigs.java
+++
b/server-common/src/main/java/org/apache/kafka/server/config/ServerLogConfigs.java
@@ -178,6 +178,12 @@ public class ServerLogConfigs {
public static final String ALTER_CONFIG_POLICY_CLASS_NAME_CONFIG =
"alter.config.policy.class.name";
public static final String ALTER_CONFIG_POLICY_CLASS_NAME_DOC = "The alter
configs policy class that should be used for validation. The class should " +
"implement the
<code>org.apache.kafka.server.policy.AlterConfigPolicy</code> interface.";
+ public static final String
ALTER_CONFIG_POLICY_KRAFT_COMPATIBILITY_ENABLE_CONFIG =
"alter.config.policy.kraft.compatibility.enable";
+ public static final boolean
ALTER_CONFIG_POLICY_KRAFT_COMPATIBILITY_ENABLE_DEFAULT = false;
+ public static final String
ALTER_CONFIG_POLICY_KRAFT_COMPATIBILITY_ENABLE_DOC = "This configuration
controls whether " +
+ "for incremental alter config operations of type SUBTRACT or
DELETE on a config entry of type LIST, " +
+ "an incremental alter config policy will invoked consistently
regardless of whether Kafka is running in KRaft or Zookeeper mode.";
+
public static final String LOG_MESSAGE_DOWNCONVERSION_ENABLE_CONFIG =
ServerTopicConfigSynonyms.serverSynonym(TopicConfig.MESSAGE_DOWNCONVERSION_ENABLE_CONFIG);
public static final boolean LOG_MESSAGE_DOWNCONVERSION_ENABLE_DEFAULT =
true;
public static final String LOG_MESSAGE_DOWNCONVERSION_ENABLE_DOC = "This
configuration controls whether " +
diff --git
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
index 3299b6e89d7..695b8d51e4f 100644
---
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
+++
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
@@ -255,6 +255,7 @@ public class LogConfig extends AbstractConfig {
.define(ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG, LONG,
ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_AFTER_MAX_MS_DEFAULT, atLeast(0),
MEDIUM, ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_AFTER_MAX_MS_DOC)
.define(ServerLogConfigs.CREATE_TOPIC_POLICY_CLASS_NAME_CONFIG,
CLASS, null, LOW, ServerLogConfigs.CREATE_TOPIC_POLICY_CLASS_NAME_DOC)
.define(ServerLogConfigs.ALTER_CONFIG_POLICY_CLASS_NAME_CONFIG,
CLASS, null, LOW, ServerLogConfigs.ALTER_CONFIG_POLICY_CLASS_NAME_DOC)
+
.define(ServerLogConfigs.ALTER_CONFIG_POLICY_KRAFT_COMPATIBILITY_ENABLE_CONFIG,
BOOLEAN,
ServerLogConfigs.ALTER_CONFIG_POLICY_KRAFT_COMPATIBILITY_ENABLE_DEFAULT, LOW,
ServerLogConfigs.ALTER_CONFIG_POLICY_KRAFT_COMPATIBILITY_ENABLE_DOC)
.define(ServerLogConfigs.LOG_MESSAGE_DOWNCONVERSION_ENABLE_CONFIG,
BOOLEAN, ServerLogConfigs.LOG_MESSAGE_DOWNCONVERSION_ENABLE_DEFAULT, LOW,
ServerLogConfigs.LOG_MESSAGE_DOWNCONVERSION_ENABLE_DOC)
.define(ServerLogConfigs.LOG_DIR_FAILURE_TIMEOUT_MS_CONFIG, LONG,
ServerLogConfigs.LOG_DIR_FAILURE_TIMEOUT_MS_DEFAULT, atLeast(1), LOW,
ServerLogConfigs.LOG_DIR_FAILURE_TIMEOUT_MS_DOC)
.defineInternal(ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_CONFIG,
LONG, ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_DEFAULT, atLeast(0), LOW,
ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_DOC);