kamalcph commented on code in PR #14161:
URL: https://github.com/apache/kafka/pull/14161#discussion_r1296143313
##########
core/src/test/scala/unit/kafka/server/KafkaServerTest.scala:
##########
@@ -154,6 +155,96 @@ class KafkaServerTest extends QuorumTestHarness {
server.shutdown()
}
+ @Test
+ def testClusterWideDisablementOfTieredStorageWithEnabledTieredTopic(): Unit
= {
+ val tsEnabledProps = TestUtils.createBrokerConfigs(1, zkConnect).head
+
tsEnabledProps.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP,
true.toString)
+
tsEnabledProps.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP,
+
"org.apache.kafka.server.log.remote.storage.NoOpRemoteLogMetadataManager")
+
tsEnabledProps.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP,
+ "org.apache.kafka.server.log.remote.storage.NoOpRemoteStorageManager")
+
+ val server = TestUtils.createServer(KafkaConfig.fromProps(tsEnabledProps))
+ server.remoteLogManagerOpt match {
+ case Some(_) =>
+ case None => fail("RemoteLogManager should be initialized")
+ }
+
+ val topicProps = new Properties()
+ topicProps.setProperty(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG,
true.toString)
+
+ TestUtils.createTopic(zkClient = server.zkClient, topic = "batman",
servers = Seq(server), topicConfig = topicProps)
+
+ server.shutdown()
+
+ val tsDisabledProps = TestUtils.createBrokerConfigs(1, zkConnect).head
+
+ assertThrows(classOf[ConfigException], () =>
TestUtils.createServer(KafkaConfig.fromProps(tsDisabledProps)))
+ }
+
+ @Test
+ def testClusterWideDisablementOfTieredStorageWithDisabledTieredTopic(): Unit
= {
+ val tsEnabledProps = TestUtils.createBrokerConfigs(1, zkConnect).head
+
tsEnabledProps.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP,
true.toString)
+
tsEnabledProps.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP,
+
"org.apache.kafka.server.log.remote.storage.NoOpRemoteLogMetadataManager")
+
tsEnabledProps.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP,
+ "org.apache.kafka.server.log.remote.storage.NoOpRemoteStorageManager")
+
+ var server = TestUtils.createServer(KafkaConfig.fromProps(tsEnabledProps))
+ server.remoteLogManagerOpt match {
+ case Some(_) =>
+ case None => fail("RemoteLogManager should be initialized")
+ }
+
+ val topicProps = new Properties()
+ topicProps.setProperty(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG,
false.toString)
+
+ TestUtils.createTopic(zkClient = server.zkClient, topic = "batman",
servers = Seq(server), topicConfig = topicProps)
+
+ server.shutdown()
+
+ val tsDisabledProps = TestUtils.createBrokerConfigs(1, zkConnect).head
+
+ server = TestUtils.createServer(KafkaConfig.fromProps(tsDisabledProps))
+
+ server.shutdown()
+ }
+
+ @Test
+ def
testClusterWithoutTieredStorageFailsOnStartupIfTopicWithTieringEnabled(): Unit
= {
+ val serverProps = TestUtils.createBrokerConfigs(1, zkConnect).head
+
+ val server = TestUtils.createServer(KafkaConfig.fromProps(serverProps))
+
+ val topicProps = new Properties()
+ topicProps.setProperty(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG,
true.toString)
+
+ TestUtils.createTopic(zkClient = server.zkClient, topic = "batman",
servers = Seq(server), topicConfig = topicProps)
+
+ server.shutdown()
+
+ assertThrows(classOf[ConfigException], () =>
TestUtils.createServer(KafkaConfig.fromProps(serverProps)))
+ }
+
+ @Test
+ def
testClusterWithoutTieredStorageFailsOnStartupIfTopicWithTieringDisabled(): Unit
= {
Review Comment:
Can the test name be changed? The test doesn't fail anywhere when
`remote.storage.enable=false`. Could you remove the `FailsOnStartup` from test
name?
##########
core/src/main/scala/kafka/server/ConfigHandler.scala:
##########
@@ -62,6 +62,15 @@ class TopicConfigHandler(private val logManager: LogManager,
kafkaConfig: KafkaC
topicConfig.asScala.forKeyValue { (key, value) =>
if (!configNamesToExclude.contains(key)) props.put(key, value)
}
+
+ val remoteLogStorageEnable =
topicConfig.getProperty(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG)
+
+ if (!kafkaConfig.remoteLogManagerConfig.enableRemoteStorageSystem()
+ && remoteLogStorageEnable != null
+ && remoteLogStorageEnable.toBoolean) {
+ throw new ConfigException(s"You have to delete all topics with the
property remote.storage.enable (i.e. $topic) before disabling tiered storage
cluster-wide")
Review Comment:
`delete` -> `disable`. We can hold on this till KAFKA-15290 gets reviewed.
##########
core/src/test/scala/unit/kafka/server/KafkaServerTest.scala:
##########
@@ -154,6 +155,96 @@ class KafkaServerTest extends QuorumTestHarness {
server.shutdown()
}
+ @Test
+ def testClusterWideDisablementOfTieredStorageWithEnabledTieredTopic(): Unit
= {
+ val tsEnabledProps = TestUtils.createBrokerConfigs(1, zkConnect).head
+
tsEnabledProps.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP,
true.toString)
+
tsEnabledProps.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP,
+
"org.apache.kafka.server.log.remote.storage.NoOpRemoteLogMetadataManager")
+
tsEnabledProps.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP,
+ "org.apache.kafka.server.log.remote.storage.NoOpRemoteStorageManager")
+
+ val server = TestUtils.createServer(KafkaConfig.fromProps(tsEnabledProps))
+ server.remoteLogManagerOpt match {
+ case Some(_) =>
+ case None => fail("RemoteLogManager should be initialized")
+ }
+
+ val topicProps = new Properties()
+ topicProps.setProperty(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG,
true.toString)
+
+ TestUtils.createTopic(zkClient = server.zkClient, topic = "batman",
servers = Seq(server), topicConfig = topicProps)
+
+ server.shutdown()
+
+ val tsDisabledProps = TestUtils.createBrokerConfigs(1, zkConnect).head
+
+ assertThrows(classOf[ConfigException], () =>
TestUtils.createServer(KafkaConfig.fromProps(tsDisabledProps)))
+ }
+
+ @Test
+ def testClusterWideDisablementOfTieredStorageWithDisabledTieredTopic(): Unit
= {
+ val tsEnabledProps = TestUtils.createBrokerConfigs(1, zkConnect).head
+
tsEnabledProps.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP,
true.toString)
+
tsEnabledProps.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP,
+
"org.apache.kafka.server.log.remote.storage.NoOpRemoteLogMetadataManager")
+
tsEnabledProps.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP,
+ "org.apache.kafka.server.log.remote.storage.NoOpRemoteStorageManager")
+
+ var server = TestUtils.createServer(KafkaConfig.fromProps(tsEnabledProps))
+ server.remoteLogManagerOpt match {
+ case Some(_) =>
+ case None => fail("RemoteLogManager should be initialized")
+ }
+
+ val topicProps = new Properties()
+ topicProps.setProperty(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG,
false.toString)
+
+ TestUtils.createTopic(zkClient = server.zkClient, topic = "batman",
servers = Seq(server), topicConfig = topicProps)
+
+ server.shutdown()
+
+ val tsDisabledProps = TestUtils.createBrokerConfigs(1, zkConnect).head
+
+ server = TestUtils.createServer(KafkaConfig.fromProps(tsDisabledProps))
+
+ server.shutdown()
+ }
+
+ @Test
+ def
testClusterWithoutTieredStorageFailsOnStartupIfTopicWithTieringEnabled(): Unit
= {
+ val serverProps = TestUtils.createBrokerConfigs(1, zkConnect).head
+
+ val server = TestUtils.createServer(KafkaConfig.fromProps(serverProps))
+
+ val topicProps = new Properties()
+ topicProps.setProperty(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG,
true.toString)
+
+ TestUtils.createTopic(zkClient = server.zkClient, topic = "batman",
servers = Seq(server), topicConfig = topicProps)
Review Comment:
If system-level tiered storage is not enabled, then topic creation with
`remote.storage.enable=true` should fail now.
##########
core/src/main/scala/kafka/server/ConfigHandler.scala:
##########
@@ -62,6 +62,15 @@ class TopicConfigHandler(private val logManager: LogManager,
kafkaConfig: KafkaC
topicConfig.asScala.forKeyValue { (key, value) =>
if (!configNamesToExclude.contains(key)) props.put(key, value)
}
+
+ val remoteLogStorageEnable =
topicConfig.getProperty(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG)
+
+ if (!kafkaConfig.remoteLogManagerConfig.enableRemoteStorageSystem()
Review Comment:
Instead of
`kafkaConfig.remoteLogManagerConfig.enableRemoteStorageSystem()` ->
`kafkaConfig.isRemoteLogStorageSystemEnabled`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]