This is an automated email from the ASF dual-hosted git repository.
showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 288b4799eea KAFKA-19655: Align the behavior of num.partitions and
default.replication.factor for topic creation (#21550)
288b4799eea is described below
commit 288b4799eeac959e9f5b8691cdb4adb25b23d1c3
Author: Kuan-Po Tseng <[email protected]>
AuthorDate: Wed Mar 4 07:43:36 2026 +0800
KAFKA-19655: Align the behavior of num.partitions and
default.replication.factor for topic creation (#21550)
Currently, 'num.partitions' and 'default.replication.factor' are applied
inconsistently. Topic auto-creation relies on Broker configs, while
Streams and AdminClient rely on Controller configs. This leads to
confusing behavior where a Broker and Controller might have diverging
defaults.
This commit implements the 4.x transition phase:
- Updated DefaultAutoTopicCreationManager to check if these configs are
explicitly set in 'broker.properties'.
- If NOT explicitly set, the Broker now sends 'NO_NUM_PARTITIONS' and
'NO_REPLICATION_FACTOR' (-1) in the CreateTopicsRequest, allowing
the Controller's configuration to take precedence.
- Added deprecation warnings in KafkaConfig when these properties are
defined in a Broker role, notifying users to migrate them to the
Controller role before 5.0.
- Updated documentation to clarify the precedence logic between
Broker and Controller nodes.
Reviewers: Luke Chen <[email protected]>
---
.../kafka/clients/admin/AutoTopicCreationTest.java | 112 +++++++++++++++++++++
.../kafka/server/AutoTopicCreationManager.scala | 12 ++-
core/src/main/scala/kafka/server/KafkaConfig.scala | 8 ++
.../server/AutoTopicCreationManagerTest.scala | 12 +--
.../kafka/server/config/ServerLogConfigs.java | 11 +-
.../kafka/server/config/ReplicationConfigs.java | 12 ++-
6 files changed, 156 insertions(+), 11 deletions(-)
diff --git
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/AutoTopicCreationTest.java
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/AutoTopicCreationTest.java
new file mode 100644
index 00000000000..9417de1d07a
--- /dev/null
+++
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/AutoTopicCreationTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.api.ClusterConfigProperty;
+import org.apache.kafka.common.test.api.ClusterTest;
+import org.apache.kafka.common.test.api.ClusterTestDefaults;
+import org.apache.kafka.common.test.api.Type;
+
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+@ClusterTestDefaults(types = {Type.KRAFT}, brokers = 2)
+public class AutoTopicCreationTest {
+
+ @ClusterTest(
+ serverProperties = {
+ @ClusterConfigProperty(id = 0, key = "num.partitions", value =
"5"),
+ @ClusterConfigProperty(id = 0, key = "default.replication.factor",
value = "2"),
+ @ClusterConfigProperty(id = 1, key = "num.partitions", value =
"5"),
+ @ClusterConfigProperty(id = 1, key = "default.replication.factor",
value = "2"),
+ }
+ )
+ public void testAutoCreateTopicWithExplicitBrokerConfig(ClusterInstance
cluster) throws Exception {
+ String topic = "explicit-broker-topic";
+ triggerAutoCreateTopic(cluster, topic);
+ try (Admin admin = cluster.admin()) {
+ TopicDescription desc =
admin.describeTopics(List.of(topic)).allTopicNames().get().get(topic);
+ assertEquals(5, desc.partitions().size(),
+ "num.partitions explicitly set on broker should be used");
+ assertEquals(2, desc.partitions().get(0).replicas().size(),
+ "default.replication.factor explicitly set on broker should be
used");
+ }
+ }
+
+ @ClusterTest(
+ serverProperties = {
+ @ClusterConfigProperty(id = 3000, key = "num.partitions", value =
"5"),
+ @ClusterConfigProperty(id = 3000, key =
"default.replication.factor", value = "2"),
+ }
+ )
+ public void testAutoCreateTopicWithImplicitBrokerConfig(ClusterInstance
cluster) throws Exception {
+ String topic = "implicit-broker-topic";
+ triggerAutoCreateTopic(cluster, topic);
+ try (Admin admin = cluster.admin()) {
+ TopicDescription desc =
admin.describeTopics(List.of(topic)).allTopicNames().get().get(topic);
+ assertEquals(5, desc.partitions().size(),
+ "Controller num.partitions should be used when broker does not
explicitly set it");
+ assertEquals(2, desc.partitions().get(0).replicas().size(),
+ "Controller default.replication.factor should be used when
broker does not explicitly set it");
+ }
+ }
+
+ @ClusterTest(
+ serverProperties = {
+ @ClusterConfigProperty(id = 0, key = "num.partitions", value =
"5"),
+ @ClusterConfigProperty(id = 1, key = "num.partitions", value =
"5"),
+ @ClusterConfigProperty(id = 3000, key =
"default.replication.factor", value = "2"),
+ }
+ )
+ public void testAutoCreateTopicWithMixedConfig(ClusterInstance cluster)
throws Exception {
+ String topic = "mixed-config-topic";
+ triggerAutoCreateTopic(cluster, topic);
+ try (Admin admin = cluster.admin()) {
+ TopicDescription desc =
admin.describeTopics(List.of(topic)).allTopicNames().get().get(topic);
+ assertEquals(5, desc.partitions().size(),
+ "num.partitions explicitly set on broker should be used");
+ assertEquals(2, desc.partitions().get(0).replicas().size(),
+ "Controller default.replication.factor should be used when
broker does not set it");
+ }
+ }
+
+ @ClusterTest
+ public void testAutoCreateTopicWithDefaultConfig(ClusterInstance cluster)
throws Exception {
+ String topic = "default-config-topic";
+ triggerAutoCreateTopic(cluster, topic);
+ try (Admin admin = cluster.admin()) {
+ TopicDescription desc =
admin.describeTopics(List.of(topic)).allTopicNames().get().get(topic);
+ assertEquals(1, desc.partitions().size(),
+ "Default num.partitions of 1 should be used when neither
broker nor controller sets it");
+ assertEquals(1, desc.partitions().get(0).replicas().size(),
+ "Default default.replication.factor of 1 should be used when
neither broker nor controller sets it");
+ }
+ }
+
+ private void triggerAutoCreateTopic(ClusterInstance cluster, String topic)
throws Exception {
+ // Sends a produce request to a non-existent topic so that auto topic
creation is triggered.
+ try (Producer<byte[], byte[]> producer = cluster.producer()) {
+ ProducerRecord<byte[], byte[]> record =
+ new ProducerRecord<>(topic, null, "key".getBytes(),
"value".getBytes());
+ producer.send(record).get();
+ }
+ }
+}
diff --git a/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
b/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
index 6f2a192a438..2d81c3694bf 100644
--- a/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
+++ b/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
@@ -33,6 +33,7 @@ import org.apache.kafka.common.requests.{CreateTopicsRequest,
CreateTopicsRespon
import org.apache.kafka.coordinator.group.GroupCoordinator
import org.apache.kafka.coordinator.share.ShareCoordinator
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
+import org.apache.kafka.server.config.{ReplicationConfigs, ServerLogConfigs}
import org.apache.kafka.server.quota.ControllerMutationQuota
import org.apache.kafka.common.utils.Time
import org.apache.kafka.server.TopicCreator
@@ -270,10 +271,17 @@ class DefaultAutoTopicCreationManager(
.setReplicationFactor(config.shareCoordinatorConfig.shareCoordinatorStateTopicReplicationFactor())
.setConfigs(convertToTopicConfigCollections(shareCoordinator.shareGroupStateTopicConfigs()))
case topicName =>
+ val numPartitions: java.lang.Integer =
+ if
(config.originals.containsKey(ServerLogConfigs.NUM_PARTITIONS_CONFIG))
config.numPartitions
+ else CreateTopicsRequest.NO_NUM_PARTITIONS
+ val replicationFactor: java.lang.Short =
+ if
(config.originals.containsKey(ReplicationConfigs.DEFAULT_REPLICATION_FACTOR_CONFIG))
config.defaultReplicationFactor.toShort
+ else CreateTopicsRequest.NO_REPLICATION_FACTOR
+
new CreatableTopic()
.setName(topicName)
- .setNumPartitions(config.numPartitions)
- .setReplicationFactor(config.defaultReplicationFactor.shortValue)
+ .setNumPartitions(numPartitions)
+ .setReplicationFactor(replicationFactor)
}
}
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index c7beff0504b..0e1a3678227 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -584,6 +584,14 @@ class KafkaConfig private(doLog: Boolean, val props:
util.Map[_, _])
// warn if create.topic.policy.class.name or
alter.config.policy.class.name is defined in the broker role
warnIfConfigDefinedInWrongRole(ProcessRole.ControllerRole,
ServerLogConfigs.CREATE_TOPIC_POLICY_CLASS_NAME_CONFIG)
warnIfConfigDefinedInWrongRole(ProcessRole.ControllerRole,
ServerLogConfigs.ALTER_CONFIG_POLICY_CLASS_NAME_CONFIG)
+ if (originals.containsKey(ServerLogConfigs.NUM_PARTITIONS_CONFIG)) {
+ warn(s"${ServerLogConfigs.NUM_PARTITIONS_CONFIG} is defined in the
broker role. This configuration will be ignored in 5.0. " +
+ s"Please set ${ServerLogConfigs.NUM_PARTITIONS_CONFIG} in the
controller role instead.")
+ }
+ if
(originals.containsKey(ReplicationConfigs.DEFAULT_REPLICATION_FACTOR_CONFIG)) {
+ warn(s"${ReplicationConfigs.DEFAULT_REPLICATION_FACTOR_CONFIG} is
defined in the broker role. This configuration will be ignored in 5.0. " +
+ s"Please set ${ReplicationConfigs.DEFAULT_REPLICATION_FACTOR_CONFIG}
in the controller role instead.")
+ }
} else if (processRoles == Set(ProcessRole.ControllerRole)) {
// KRaft controller-only
validateQuorumVotersAndQuorumBootstrapServerForKRaft()
diff --git
a/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala
b/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala
index 5deaa94306b..5b436c627b1 100644
--- a/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala
@@ -125,13 +125,13 @@ class AutoTopicCreationManagerTest {
val props = TestUtils.createBrokerConfig(1)
props.setProperty(ServerConfigs.REQUEST_TIMEOUT_MS_CONFIG,
requestTimeout.toString)
-
props.setProperty(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG,
internalTopicPartitions.toString)
-
props.setProperty(TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG,
internalTopicPartitions.toString)
-
props.setProperty(ShareCoordinatorConfig.STATE_TOPIC_REPLICATION_FACTOR_CONFIG
, internalTopicPartitions.toString)
+
props.setProperty(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG,
internalTopicReplicationFactor.toString)
+
props.setProperty(TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG,
internalTopicReplicationFactor.toString)
+
props.setProperty(ShareCoordinatorConfig.STATE_TOPIC_REPLICATION_FACTOR_CONFIG
, internalTopicReplicationFactor.toString)
- props.setProperty(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG,
internalTopicReplicationFactor.toString)
-
props.setProperty(TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG,
internalTopicReplicationFactor.toString)
-
props.setProperty(ShareCoordinatorConfig.STATE_TOPIC_NUM_PARTITIONS_CONFIG,
internalTopicReplicationFactor.toString)
+ props.setProperty(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG,
internalTopicPartitions.toString)
+
props.setProperty(TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG,
internalTopicPartitions.toString)
+
props.setProperty(ShareCoordinatorConfig.STATE_TOPIC_NUM_PARTITIONS_CONFIG,
internalTopicPartitions.toString)
config = KafkaConfig.fromProps(props)
val aliveBrokers = util.List.of(new Node(0, "host0", 0), new Node(1,
"host1", 1))
diff --git
a/server-common/src/main/java/org/apache/kafka/server/config/ServerLogConfigs.java
b/server-common/src/main/java/org/apache/kafka/server/config/ServerLogConfigs.java
index dbfe8b13eed..d8ffd8a5e2f 100644
---
a/server-common/src/main/java/org/apache/kafka/server/config/ServerLogConfigs.java
+++
b/server-common/src/main/java/org/apache/kafka/server/config/ServerLogConfigs.java
@@ -34,7 +34,16 @@ import static
org.apache.kafka.server.config.ServerTopicConfigSynonyms.LOG_PREFI
public class ServerLogConfigs {
public static final String NUM_PARTITIONS_CONFIG = "num.partitions";
public static final int NUM_PARTITIONS_DEFAULT = 1;
- public static final String NUM_PARTITIONS_DOC = "The default number of log
partitions per topic";
+ public static final String NUM_PARTITIONS_DOC =
+ "The default number of log partitions per topic. This configuration
affects the following paths:"
+ + "<ul>"
+ + " <li>1. Auto topic creation</li>"
+ + " <li>2. Internal streams topic creation</li>"
+ + " <li>3. Topic creation via <code>AdminClient#createTopics</code>
when the number of partition is set to -1</li>"
+ + "</ul>"
+ + "<p>For (1), the value from the broker configuration is used only
when it is explicitly set. "
+ + "If it is not explicitly configured on the broker, the value from
the controller configuration is used.<br/>"
+ + "For (2) and (3), the value from the controller configuration is
always used.</p>";
public static final String LOG_DIRS_CONFIG = LOG_PREFIX + "dirs";
public static final String LOG_DIR_CONFIG = LOG_PREFIX + "dir";
diff --git
a/server/src/main/java/org/apache/kafka/server/config/ReplicationConfigs.java
b/server/src/main/java/org/apache/kafka/server/config/ReplicationConfigs.java
index d39a141a458..664eae9448e 100644
---
a/server/src/main/java/org/apache/kafka/server/config/ReplicationConfigs.java
+++
b/server/src/main/java/org/apache/kafka/server/config/ReplicationConfigs.java
@@ -40,8 +40,16 @@ public class ReplicationConfigs {
public static final String DEFAULT_REPLICATION_FACTOR_CONFIG =
"default.replication.factor";
public static final int REPLICATION_FACTOR_DEFAULT = 1;
- public static final String DEFAULT_REPLICATION_FACTOR_DOC = "The
replication factor for automatically created topics," +
- " and for topics created with -1 as the replication factor";
+ public static final String DEFAULT_REPLICATION_FACTOR_DOC =
+ "The default replication factor per topic. This configuration affects
the following paths:"
+ + "<ul>"
+ + " <li>1. Auto topic creation</li>"
+ + " <li>2. Internal streams topic creation</li>"
+ + " <li>3. Topic creation via <code>AdminClient#createTopics</code>
when the replication factor is set to -1</li>"
+ + "</ul>"
+ + "<p>For (1), the value from the broker configuration is used only
when it is explicitly set. "
+ + "If it is not explicitly configured on the broker, the value from
the controller configuration is used.<br/>"
+ + "For (2) and (3), the value from the controller configuration is
always used.</p>";
public static final String REPLICA_LAG_TIME_MAX_MS_CONFIG =
"replica.lag.time.max.ms";
public static final long REPLICA_LAG_TIME_MAX_MS_DEFAULT = 30000L;