This is an automated email from the ASF dual-hosted git repository.

showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4cdd4b617c2 KAFKA-19071: Fix doc for remote.storage.enable (#19345)
4cdd4b617c2 is described below

commit 4cdd4b617c2ca2805050c7ec26048e58b05d9055
Author: Azhar Ahmed <[email protected]>
AuthorDate: Sun Apr 13 20:08:49 2025 -0700

    KAFKA-19071: Fix doc for remote.storage.enable (#19345)
    
    As of 3.9, Kafka allows disabling remote storage on a topic after it was
    enabled. It allows subsequent enabling and disabling too.
    
    However the documentation says otherwise and needs to be corrected.
    
    Doc:
    
https://kafka.apache.org/39/documentation/#topicconfigs_remote.storage.enable
    
    Reviewers: Luke Chen <[email protected]>, PoAn Yang <[email protected]>, 
Ken Huang <[email protected]>
---
 .../apache/kafka/common/config/TopicConfig.java    |  5 ++--
 .../kafka/admin/RemoteTopicCrudTest.scala          | 31 +++++++++++++++++++++-
 2 files changed, 33 insertions(+), 3 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java 
b/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
index d8043e9be9d..f3157bb1b1a 100755
--- a/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
@@ -80,8 +80,9 @@ public class TopicConfig {
         "Moreover, it triggers the rolling of new segment if the retention.ms 
condition is satisfied.";
 
     public static final String REMOTE_LOG_STORAGE_ENABLE_CONFIG = 
"remote.storage.enable";
-    public static final String REMOTE_LOG_STORAGE_ENABLE_DOC = "To enable 
tiered storage for a topic, set this configuration as true. " +
-            "You can not disable this config once it is enabled. It will be 
provided in future versions.";
+    public static final String REMOTE_LOG_STORAGE_ENABLE_DOC = "To enable 
tiered storage for a topic, set this configuration to true. " +
+            "To disable tiered storage for a topic that has it enabled, set 
this configuration to false. " +
+            "When disabling, you must also set 
<code>remote.log.delete.on.disable</code> to true.";
 
     public static final String LOCAL_LOG_RETENTION_MS_CONFIG = 
"local.retention.ms";
     public static final String LOCAL_LOG_RETENTION_MS_DOC = "The number of 
milliseconds to keep the local log segment before it gets deleted. " +
diff --git 
a/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala 
b/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala
index d89a83c7750..4f66dd9e311 100644
--- a/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala
+++ b/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala
@@ -441,7 +441,36 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
           AlterConfigOp.OpType.SET),
       ))
     assertThrowsException(classOf[InvalidConfigurationException],
-      () => admin.incrementalAlterConfigs(configs).all().get(), "Disabling 
remote storage feature on the topic level is not supported.")
+      () => admin.incrementalAlterConfigs(configs).all().get(), "It is invalid 
to disable remote storage without deleting remote data. " +
+        "If you want to keep the remote data and turn to read only, please set 
`remote.storage.enable=true,remote.log.copy.disable=true`. " +
+        "If you want to disable remote storage and delete all remote data, 
please set `remote.storage.enable=false,remote.log.delete.on.disable=true`.")
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("kraft"))
+  def 
testUpdateTopicConfigWithDisablingRemoteStorageWithDeleteOnDisable(quorum: 
String): Unit = {
+    val admin = createAdminClient()
+    val topicConfig = new Properties
+    topicConfig.setProperty(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, 
"true")
+    TestUtils.createTopicWithAdmin(admin, testTopicName, brokers, 
controllerServers, numPartitions, numReplicationFactor,
+      topicConfig = topicConfig)
+
+    val configs = new util.HashMap[ConfigResource, 
util.Collection[AlterConfigOp]]()
+    configs.put(new ConfigResource(ConfigResource.Type.TOPIC, testTopicName),
+      util.Arrays.asList(
+        new AlterConfigOp(new 
ConfigEntry(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "false"),
+          AlterConfigOp.OpType.SET),
+        new AlterConfigOp(new 
ConfigEntry(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG, "true"),
+          AlterConfigOp.OpType.SET)
+      ))
+    admin.incrementalAlterConfigs(configs).all().get()
+
+    val newProps = new Properties()
+    configs.get(new ConfigResource(ConfigResource.Type.TOPIC, 
testTopicName)).forEach { op =>
+      newProps.setProperty(op.configEntry().name(), op.configEntry().value())
+    }
+
+    verifyRemoteLogTopicConfigs(newProps)
   }
 
   @ParameterizedTest

Reply via email to