This is an automated email from the ASF dual-hosted git repository.
showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 4cdd4b617c2 KAFKA-19071: Fix doc for remote.storage.enable (#19345)
4cdd4b617c2 is described below
commit 4cdd4b617c2ca2805050c7ec26048e58b05d9055
Author: Azhar Ahmed <[email protected]>
AuthorDate: Sun Apr 13 20:08:49 2025 -0700
KAFKA-19071: Fix doc for remote.storage.enable (#19345)
As of 3.9, Kafka allows disabling remote storage on a topic after it was
enabled. It allows subsequent enabling and disabling too.
However the documentation says otherwise and needs to be corrected.
Doc:
https://kafka.apache.org/39/documentation/#topicconfigs_remote.storage.enable
Reviewers: Luke Chen <[email protected]>, PoAn Yang <[email protected]>,
Ken Huang <[email protected]>
---
.../apache/kafka/common/config/TopicConfig.java | 5 ++--
.../kafka/admin/RemoteTopicCrudTest.scala | 31 +++++++++++++++++++++-
2 files changed, 33 insertions(+), 3 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
b/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
index d8043e9be9d..f3157bb1b1a 100755
--- a/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
@@ -80,8 +80,9 @@ public class TopicConfig {
"Moreover, it triggers the rolling of new segment if the retention.ms
condition is satisfied.";
public static final String REMOTE_LOG_STORAGE_ENABLE_CONFIG =
"remote.storage.enable";
- public static final String REMOTE_LOG_STORAGE_ENABLE_DOC = "To enable
tiered storage for a topic, set this configuration as true. " +
- "You can not disable this config once it is enabled. It will be
provided in future versions.";
+ public static final String REMOTE_LOG_STORAGE_ENABLE_DOC = "To enable
tiered storage for a topic, set this configuration to true. " +
+ "To disable tiered storage for a topic that has it enabled, set
this configuration to false. " +
+ "When disabling, you must also set
<code>remote.log.delete.on.disable</code> to true.";
public static final String LOCAL_LOG_RETENTION_MS_CONFIG =
"local.retention.ms";
public static final String LOCAL_LOG_RETENTION_MS_DOC = "The number of
milliseconds to keep the local log segment before it gets deleted. " +
diff --git
a/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala
b/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala
index d89a83c7750..4f66dd9e311 100644
--- a/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala
+++ b/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala
@@ -441,7 +441,36 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
AlterConfigOp.OpType.SET),
))
assertThrowsException(classOf[InvalidConfigurationException],
- () => admin.incrementalAlterConfigs(configs).all().get(), "Disabling
remote storage feature on the topic level is not supported.")
+ () => admin.incrementalAlterConfigs(configs).all().get(), "It is invalid
to disable remote storage without deleting remote data. " +
+ "If you want to keep the remote data and turn to read only, please set
`remote.storage.enable=true,remote.log.copy.disable=true`. " +
+ "If you want to disable remote storage and delete all remote data,
please set `remote.storage.enable=false,remote.log.delete.on.disable=true`.")
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("kraft"))
+ def
testUpdateTopicConfigWithDisablingRemoteStorageWithDeleteOnDisable(quorum:
String): Unit = {
+ val admin = createAdminClient()
+ val topicConfig = new Properties
+ topicConfig.setProperty(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG,
"true")
+ TestUtils.createTopicWithAdmin(admin, testTopicName, brokers,
controllerServers, numPartitions, numReplicationFactor,
+ topicConfig = topicConfig)
+
+ val configs = new util.HashMap[ConfigResource,
util.Collection[AlterConfigOp]]()
+ configs.put(new ConfigResource(ConfigResource.Type.TOPIC, testTopicName),
+ util.Arrays.asList(
+ new AlterConfigOp(new
ConfigEntry(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "false"),
+ AlterConfigOp.OpType.SET),
+ new AlterConfigOp(new
ConfigEntry(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG, "true"),
+ AlterConfigOp.OpType.SET)
+ ))
+ admin.incrementalAlterConfigs(configs).all().get()
+
+ val newProps = new Properties()
+ configs.get(new ConfigResource(ConfigResource.Type.TOPIC,
testTopicName)).forEach { op =>
+ newProps.setProperty(op.configEntry().name(), op.configEntry().value())
+ }
+
+ verifyRemoteLogTopicConfigs(newProps)
}
@ParameterizedTest