kamalcph commented on code in PR #14161:
URL: https://github.com/apache/kafka/pull/14161#discussion_r1296143313


##########
core/src/test/scala/unit/kafka/server/KafkaServerTest.scala:
##########
@@ -154,6 +155,96 @@ class KafkaServerTest extends QuorumTestHarness {
     server.shutdown()
   }
 
+  @Test
+  def testClusterWideDisablementOfTieredStorageWithEnabledTieredTopic(): Unit 
= {
+    val tsEnabledProps = TestUtils.createBrokerConfigs(1, zkConnect).head
+    
tsEnabledProps.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP,
 true.toString)
+    
tsEnabledProps.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP,
+      
"org.apache.kafka.server.log.remote.storage.NoOpRemoteLogMetadataManager")
+    
tsEnabledProps.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP,
+      "org.apache.kafka.server.log.remote.storage.NoOpRemoteStorageManager")
+
+    val server = TestUtils.createServer(KafkaConfig.fromProps(tsEnabledProps))
+    server.remoteLogManagerOpt match {
+      case Some(_) =>
+      case None => fail("RemoteLogManager should be initialized")
+    }
+
+    val topicProps = new Properties()
+    topicProps.setProperty(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, 
true.toString)
+
+    TestUtils.createTopic(zkClient = server.zkClient, topic = "batman", 
servers = Seq(server), topicConfig = topicProps)
+
+    server.shutdown()
+
+    val tsDisabledProps = TestUtils.createBrokerConfigs(1, zkConnect).head
+
+    assertThrows(classOf[ConfigException], () => 
TestUtils.createServer(KafkaConfig.fromProps(tsDisabledProps)))
+  }
+
+  @Test
+  def testClusterWideDisablementOfTieredStorageWithDisabledTieredTopic(): Unit 
= {
+    val tsEnabledProps = TestUtils.createBrokerConfigs(1, zkConnect).head
+    
tsEnabledProps.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP,
 true.toString)
+    
tsEnabledProps.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP,
+      
"org.apache.kafka.server.log.remote.storage.NoOpRemoteLogMetadataManager")
+    
tsEnabledProps.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP,
+      "org.apache.kafka.server.log.remote.storage.NoOpRemoteStorageManager")
+
+    var server = TestUtils.createServer(KafkaConfig.fromProps(tsEnabledProps))
+    server.remoteLogManagerOpt match {
+      case Some(_) =>
+      case None => fail("RemoteLogManager should be initialized")
+    }
+
+    val topicProps = new Properties()
+    topicProps.setProperty(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, 
false.toString)
+
+    TestUtils.createTopic(zkClient = server.zkClient, topic = "batman", 
servers = Seq(server), topicConfig = topicProps)
+
+    server.shutdown()
+
+    val tsDisabledProps = TestUtils.createBrokerConfigs(1, zkConnect).head
+
+    server = TestUtils.createServer(KafkaConfig.fromProps(tsDisabledProps))
+
+    server.shutdown()
+  }
+
+  @Test
+  def 
testClusterWithoutTieredStorageFailsOnStartupIfTopicWithTieringEnabled(): Unit 
= {
+    val serverProps = TestUtils.createBrokerConfigs(1, zkConnect).head
+
+    val server = TestUtils.createServer(KafkaConfig.fromProps(serverProps))
+
+    val topicProps = new Properties()
+    topicProps.setProperty(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, 
true.toString)
+
+    TestUtils.createTopic(zkClient = server.zkClient, topic = "batman", 
servers = Seq(server), topicConfig = topicProps)
+
+    server.shutdown()
+
+    assertThrows(classOf[ConfigException], () => 
TestUtils.createServer(KafkaConfig.fromProps(serverProps)))
+  }
+
+  @Test
+  def 
testClusterWithoutTieredStorageFailsOnStartupIfTopicWithTieringDisabled(): Unit 
= {

Review Comment:
   Can the test name be changed? The test doesn't fail anywhere when 
`remote.storage.enable=false`. Could you remove the `FailsOnStartup` from test 
name?



##########
core/src/main/scala/kafka/server/ConfigHandler.scala:
##########
@@ -62,6 +62,15 @@ class TopicConfigHandler(private val logManager: LogManager, 
kafkaConfig: KafkaC
     topicConfig.asScala.forKeyValue { (key, value) =>
       if (!configNamesToExclude.contains(key)) props.put(key, value)
     }
+
+    val remoteLogStorageEnable = 
topicConfig.getProperty(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG)
+
+    if (!kafkaConfig.remoteLogManagerConfig.enableRemoteStorageSystem()
+      && remoteLogStorageEnable != null
+      && remoteLogStorageEnable.toBoolean) {
+      throw new ConfigException(s"You have to delete all topics with the 
property remote.storage.enable (i.e. $topic) before disabling tiered storage 
cluster-wide")

Review Comment:
   `delete` -> `disable`.  We can hold on this till KAFKA-15290 gets reviewed.



##########
core/src/test/scala/unit/kafka/server/KafkaServerTest.scala:
##########
@@ -154,6 +155,96 @@ class KafkaServerTest extends QuorumTestHarness {
     server.shutdown()
   }
 
+  @Test
+  def testClusterWideDisablementOfTieredStorageWithEnabledTieredTopic(): Unit 
= {
+    val tsEnabledProps = TestUtils.createBrokerConfigs(1, zkConnect).head
+    
tsEnabledProps.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP,
 true.toString)
+    
tsEnabledProps.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP,
+      
"org.apache.kafka.server.log.remote.storage.NoOpRemoteLogMetadataManager")
+    
tsEnabledProps.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP,
+      "org.apache.kafka.server.log.remote.storage.NoOpRemoteStorageManager")
+
+    val server = TestUtils.createServer(KafkaConfig.fromProps(tsEnabledProps))
+    server.remoteLogManagerOpt match {
+      case Some(_) =>
+      case None => fail("RemoteLogManager should be initialized")
+    }
+
+    val topicProps = new Properties()
+    topicProps.setProperty(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, 
true.toString)
+
+    TestUtils.createTopic(zkClient = server.zkClient, topic = "batman", 
servers = Seq(server), topicConfig = topicProps)
+
+    server.shutdown()
+
+    val tsDisabledProps = TestUtils.createBrokerConfigs(1, zkConnect).head
+
+    assertThrows(classOf[ConfigException], () => 
TestUtils.createServer(KafkaConfig.fromProps(tsDisabledProps)))
+  }
+
+  @Test
+  def testClusterWideDisablementOfTieredStorageWithDisabledTieredTopic(): Unit 
= {
+    val tsEnabledProps = TestUtils.createBrokerConfigs(1, zkConnect).head
+    
tsEnabledProps.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP,
 true.toString)
+    
tsEnabledProps.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP,
+      
"org.apache.kafka.server.log.remote.storage.NoOpRemoteLogMetadataManager")
+    
tsEnabledProps.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP,
+      "org.apache.kafka.server.log.remote.storage.NoOpRemoteStorageManager")
+
+    var server = TestUtils.createServer(KafkaConfig.fromProps(tsEnabledProps))
+    server.remoteLogManagerOpt match {
+      case Some(_) =>
+      case None => fail("RemoteLogManager should be initialized")
+    }
+
+    val topicProps = new Properties()
+    topicProps.setProperty(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, 
false.toString)
+
+    TestUtils.createTopic(zkClient = server.zkClient, topic = "batman", 
servers = Seq(server), topicConfig = topicProps)
+
+    server.shutdown()
+
+    val tsDisabledProps = TestUtils.createBrokerConfigs(1, zkConnect).head
+
+    server = TestUtils.createServer(KafkaConfig.fromProps(tsDisabledProps))
+
+    server.shutdown()
+  }
+
+  @Test
+  def 
testClusterWithoutTieredStorageFailsOnStartupIfTopicWithTieringEnabled(): Unit 
= {
+    val serverProps = TestUtils.createBrokerConfigs(1, zkConnect).head
+
+    val server = TestUtils.createServer(KafkaConfig.fromProps(serverProps))
+
+    val topicProps = new Properties()
+    topicProps.setProperty(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, 
true.toString)
+
+    TestUtils.createTopic(zkClient = server.zkClient, topic = "batman", 
servers = Seq(server), topicConfig = topicProps)

Review Comment:
   If system-level tiered storage is not enabled, then topic creation with 
`remote.storage.enable=true` should fail now.



##########
core/src/main/scala/kafka/server/ConfigHandler.scala:
##########
@@ -62,6 +62,15 @@ class TopicConfigHandler(private val logManager: LogManager, 
kafkaConfig: KafkaC
     topicConfig.asScala.forKeyValue { (key, value) =>
       if (!configNamesToExclude.contains(key)) props.put(key, value)
     }
+
+    val remoteLogStorageEnable = 
topicConfig.getProperty(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG)
+
+    if (!kafkaConfig.remoteLogManagerConfig.enableRemoteStorageSystem()

Review Comment:
   Instead of 
   
   `kafkaConfig.remoteLogManagerConfig.enableRemoteStorageSystem()` ->
   
   `kafkaConfig.isRemoteLogStorageSystemEnabled`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to