[GitHub] [kafka] divijvaidya commented on a diff in pull request #14176: KAFKA-15295: Add config validation when remote storage is enabled on a topic
divijvaidya commented on code in PR #14176: URL: https://github.com/apache/kafka/pull/14176#discussion_r1294874991 ## core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala: ## @@ -0,0 +1,331 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.admin + +import kafka.api.IntegrationTestHarness +import kafka.server.KafkaConfig +import kafka.utils.{TestInfoUtils, TestUtils} +import org.apache.kafka.clients.admin.{AlterConfigOp, ConfigEntry} +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.config.{ConfigResource, TopicConfig} +import org.apache.kafka.common.errors.InvalidConfigurationException +import org.apache.kafka.server.log.remote.storage.{NoOpRemoteLogMetadataManager, NoOpRemoteStorageManager, RemoteLogManagerConfig} +import org.junit.jupiter.api.Assertions._ +import org.junit.jupiter.api.function.Executable +import org.junit.jupiter.api.{BeforeEach, Tag, TestInfo} +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.ValueSource + +import java.util +import java.util.{Collections, Properties} +import scala.collection.Seq +import scala.concurrent.ExecutionException +import scala.util.Random + +@Tag("integration") +class RemoteTopicCrudTest extends IntegrationTestHarness { + + val numPartitions = 2 + val numReplicationFactor = 2 + var testTopicName: String = _ + var sysRemoteStorageEnabled = true + + override protected def brokerCount: Int = 2 + + override protected def modifyConfigs(props: Seq[Properties]): Unit = { +props.foreach(p => p.putAll(overrideProps())) + } + + override protected def kraftControllerConfigs(): Seq[Properties] = { +Seq(overrideProps()) + } + + @BeforeEach + override def setUp(info: TestInfo): Unit = { +if (info.getTestMethod.get().getName.endsWith("SystemRemoteStorageIsDisabled")) { + sysRemoteStorageEnabled = false +} +super.setUp(info) +testTopicName = s"${info.getTestMethod.get().getName}-${Random.alphanumeric.take(10).mkString}" + } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testCreateRemoteTopicWithValidRetentionTime(quorum: String): Unit = { +val topicConfig = new Properties() +topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true") +topicConfig.put(TopicConfig.RETENTION_MS_CONFIG, "200") +topicConfig.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, "100") +TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, numPartitions, numReplicationFactor, + topicConfig = topicConfig) +verifyRemoteLogTopicConfigs(topicConfig) + } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testCreateRemoteTopicWithValidRetentionSize(quorum: String): Unit = { +val topicConfig = new Properties() +topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true") +topicConfig.put(TopicConfig.RETENTION_BYTES_CONFIG, "512") +topicConfig.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, "256") +TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, numPartitions, numReplicationFactor, + topicConfig = topicConfig) +verifyRemoteLogTopicConfigs(topicConfig) + } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testCreateRemoteTopicWithInheritedLocalRetentionTime(quorum: String): Unit = { +// inherited local retention ms is 1000 +val topicConfig = new Properties() +topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true") +topicConfig.put(TopicConfig.RETENTION_MS_CONFIG, "1001") +TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, numPartitions, numReplicationFactor, + topicConfig = topicConfig) +verifyRemoteLogTopicConfigs(topicConfig) + } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testCreateRemoteTopicWithInheritedLocalRetentionSize(quorum: String): Unit = { +//
[GitHub] [kafka] divijvaidya commented on a diff in pull request #14176: KAFKA-15295: Add config validation when remote storage is enabled on a topic
divijvaidya commented on code in PR #14176: URL: https://github.com/apache/kafka/pull/14176#discussion_r1294846904 ## core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala: ## @@ -0,0 +1,331 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.admin + +import kafka.api.IntegrationTestHarness +import kafka.server.KafkaConfig +import kafka.utils.{TestInfoUtils, TestUtils} +import org.apache.kafka.clients.admin.{AlterConfigOp, ConfigEntry} +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.config.{ConfigResource, TopicConfig} +import org.apache.kafka.common.errors.InvalidConfigurationException +import org.apache.kafka.server.log.remote.storage.{NoOpRemoteLogMetadataManager, NoOpRemoteStorageManager, RemoteLogManagerConfig} +import org.junit.jupiter.api.Assertions._ +import org.junit.jupiter.api.function.Executable +import org.junit.jupiter.api.{BeforeEach, Tag, TestInfo} +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.ValueSource + +import java.util +import java.util.{Collections, Properties} +import scala.collection.Seq +import scala.concurrent.ExecutionException +import scala.util.Random + +@Tag("integration") +class RemoteTopicCrudTest extends IntegrationTestHarness { + + val numPartitions = 2 + val numReplicationFactor = 2 + var testTopicName: String = _ + var sysRemoteStorageEnabled = true + + override protected def brokerCount: Int = 2 + + override protected def modifyConfigs(props: Seq[Properties]): Unit = { +props.foreach(p => p.putAll(overrideProps())) + } + + override protected def kraftControllerConfigs(): Seq[Properties] = { +Seq(overrideProps()) + } + + @BeforeEach + override def setUp(info: TestInfo): Unit = { +if (info.getTestMethod.get().getName.endsWith("SystemRemoteStorageIsDisabled")) { + sysRemoteStorageEnabled = false +} +super.setUp(info) +testTopicName = s"${info.getTestMethod.get().getName}-${Random.alphanumeric.take(10).mkString}" + } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testCreateRemoteTopicWithValidRetentionTime(quorum: String): Unit = { +val topicConfig = new Properties() +topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true") +topicConfig.put(TopicConfig.RETENTION_MS_CONFIG, "200") +topicConfig.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, "100") +TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, numPartitions, numReplicationFactor, + topicConfig = topicConfig) +verifyRemoteLogTopicConfigs(topicConfig) + } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testCreateRemoteTopicWithValidRetentionSize(quorum: String): Unit = { +val topicConfig = new Properties() +topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true") +topicConfig.put(TopicConfig.RETENTION_BYTES_CONFIG, "512") +topicConfig.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, "256") +TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, numPartitions, numReplicationFactor, + topicConfig = topicConfig) +verifyRemoteLogTopicConfigs(topicConfig) + } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testCreateRemoteTopicWithInheritedLocalRetentionTime(quorum: String): Unit = { +// inherited local retention ms is 1000 +val topicConfig = new Properties() +topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true") +topicConfig.put(TopicConfig.RETENTION_MS_CONFIG, "1001") +TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, numPartitions, numReplicationFactor, + topicConfig = topicConfig) +verifyRemoteLogTopicConfigs(topicConfig) + } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testCreateRemoteTopicWithInheritedLocalRetentionSize(quorum: String): Unit = { +//
[GitHub] [kafka] divijvaidya commented on a diff in pull request #14176: KAFKA-15295: Add config validation when remote storage is enabled on a topic
divijvaidya commented on code in PR #14176: URL: https://github.com/apache/kafka/pull/14176#discussion_r1294410814 ## storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java: ## @@ -265,7 +266,12 @@ public Optional serverConfigName(String configName) { .define(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, LONG, DEFAULT_LOCAL_RETENTION_MS, atLeast(-2), MEDIUM, TopicConfig.LOCAL_LOG_RETENTION_MS_DOC) .define(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, LONG, DEFAULT_LOCAL_RETENTION_BYTES, atLeast(-2), MEDIUM, -TopicConfig.LOCAL_LOG_RETENTION_BYTES_DOC); +TopicConfig.LOCAL_LOG_RETENTION_BYTES_DOC) +// RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP is defined here to ensure that when system +// level remote storage functionality is disabled, topics cannot be configured to use remote storage. + .defineInternal(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, BOOLEAN, + RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_STORAGE_SYSTEM_ENABLE, null, MEDIUM, + RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_DOC); Review Comment: I disagree that we just pass one specific config instead of generic kafkaConfig object, but in the interest of wrapping up this PR since it has undergone multiple revision, I am ok with this approach. We can always come back and refactor later. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #14176: KAFKA-15295: Add config validation when remote storage is enabled on a topic
divijvaidya commented on code in PR #14176: URL: https://github.com/apache/kafka/pull/14176#discussion_r1293349126 ## core/src/main/scala/kafka/server/ControllerServer.scala: ## @@ -231,7 +231,7 @@ class ControllerServer( setMetrics(quorumControllerMetrics). setCreateTopicPolicy(createTopicPolicy.asJava). setAlterConfigPolicy(alterConfigPolicy.asJava). - setConfigurationValidator(new ControllerConfigurationValidator()). + setConfigurationValidator(new ControllerConfigurationValidator(sharedServer.brokerConfig)). Review Comment: Nevertheless, we can discuss it in separate JIRA that I created above. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #14176: KAFKA-15295: Add config validation when remote storage is enabled on a topic
divijvaidya commented on code in PR #14176: URL: https://github.com/apache/kafka/pull/14176#discussion_r1293348162 ## core/src/main/scala/kafka/server/ControllerServer.scala: ## @@ -231,7 +231,7 @@ class ControllerServer( setMetrics(quorumControllerMetrics). setCreateTopicPolicy(createTopicPolicy.asJava). setAlterConfigPolicy(alterConfigPolicy.asJava). - setConfigurationValidator(new ControllerConfigurationValidator()). + setConfigurationValidator(new ControllerConfigurationValidator(sharedServer.brokerConfig)). Review Comment: Nope, I would disagree that the scenario is complicated. For larger clusters containing hundreds of nodes, rolling restart can take a long time. Any functionality that we introduce in Kafka code base should be able to handle scenarios where some brokers have features enabled and others don't. In existing code base this is achieved by using the "features" [1]. When a broker sends metadata to the controller, it will also send "features" that it supports. In our situation, we need to add TS as a "feature". So during rolling restart, controller knows that not all brokers have the correct feature and will reject any call to enable TS for a topic. After rolling restart is complete, controller will know that all brokers have TS feature on them, hence, it can start enabling TS for topic. [1] https://github.com/apache/kafka/blob/43751d8d0521b1440a823a9430fdb0659ce7c436/core/src/main/scala/kafka/server/BrokerFeatures.scala#L33 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #14176: KAFKA-15295: Add config validation when remote storage is enabled on a topic
divijvaidya commented on code in PR #14176: URL: https://github.com/apache/kafka/pull/14176#discussion_r1293181861 ## core/src/main/scala/kafka/server/ControllerServer.scala: ## @@ -231,7 +231,7 @@ class ControllerServer( setMetrics(quorumControllerMetrics). setCreateTopicPolicy(createTopicPolicy.asJava). setAlterConfigPolicy(alterConfigPolicy.asJava). - setConfigurationValidator(new ControllerConfigurationValidator()). + setConfigurationValidator(new ControllerConfigurationValidator(sharedServer.brokerConfig)). Review Comment: I created a JIRA for the scenario mentioned above. We can consider it separately. https://issues.apache.org/jira/browse/KAFKA-15341 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #14176: KAFKA-15295: Add config validation when remote storage is enabled on a topic
divijvaidya commented on code in PR #14176: URL: https://github.com/apache/kafka/pull/14176#discussion_r1293158075 ## core/src/test/scala/integration/kafka/admin/RemoteTopicCRUDTest.scala: ## @@ -0,0 +1,262 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.admin + +import kafka.api.IntegrationTestHarness +import kafka.server.KafkaConfig +import kafka.utils.{TestInfoUtils, TestUtils} +import org.apache.kafka.clients.admin.{AlterConfigOp, ConfigEntry} +import org.apache.kafka.common.config.{ConfigResource, TopicConfig} +import org.apache.kafka.common.errors.InvalidConfigurationException +import org.apache.kafka.server.log.remote.storage.{NoOpRemoteLogMetadataManager, NoOpRemoteStorageManager, RemoteLogManagerConfig} +import org.junit.jupiter.api.Assertions._ +import org.junit.jupiter.api.function.Executable +import org.junit.jupiter.api.{BeforeEach, Tag, TestInfo} +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.ValueSource + +import java.util +import java.util.{Collections, Properties} +import scala.collection.Seq +import scala.concurrent.ExecutionException +import scala.util.Random + +@Tag("integration") +class RemoteTopicCRUDTest extends IntegrationTestHarness { + + val numPartitions = 2 + val numReplicationFactor = 2 + var testTopicName: String = _ + + override protected def brokerCount: Int = 2 + + override protected def modifyConfigs(props: Seq[Properties]): Unit = { +props.foreach(p => p.putAll(overrideProps())) + } + + override protected def kraftControllerConfigs(): Seq[Properties] = { +Seq(overrideProps()) + } + + @BeforeEach + override def setUp(info: TestInfo): Unit = { +super.setUp(info) +testTopicName = s"${info.getTestMethod.get().getName}-${Random.alphanumeric.take(10).mkString}" + } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testCreateRemoteTopicWithValidRetentionTime(quorum: String): Unit = { +val topicConfig = new Properties() +topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true") +topicConfig.put(TopicConfig.RETENTION_MS_CONFIG, "200") +topicConfig.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, "100") +TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, numPartitions, numReplicationFactor, + topicConfig = topicConfig) + } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testCreateRemoteTopicWithValidRetentionSize(quorum: String): Unit = { +val topicConfig = new Properties() +topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true") +topicConfig.put(TopicConfig.RETENTION_BYTES_CONFIG, "512") +topicConfig.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, "256") +TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, numPartitions, numReplicationFactor, + topicConfig = topicConfig) + } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testCreateRemoteTopicWithInheritedLocalRetentionTime(quorum: String): Unit = { +// inherited local retention ms is 1000 +val topicConfig = new Properties() +topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true") +topicConfig.put(TopicConfig.RETENTION_MS_CONFIG, "1001") +TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, numPartitions, numReplicationFactor, + topicConfig = topicConfig) + } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testCreateRemoteTopicWithInheritedLocalRetentionSize(quorum: String): Unit = { +// inherited local retention bytes is 1024 +val topicConfig = new Properties() +topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true") +topicConfig.put(TopicConfig.RETENTION_BYTES_CONFIG, "1025") +TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, numPartitions, numReplicationFactor, + topicConfi
[GitHub] [kafka] divijvaidya commented on a diff in pull request #14176: KAFKA-15295: Add config validation when remote storage is enabled on a topic
divijvaidya commented on code in PR #14176: URL: https://github.com/apache/kafka/pull/14176#discussion_r1291232121 ## core/src/main/scala/kafka/server/ControllerServer.scala: ## @@ -231,7 +231,7 @@ class ControllerServer( setMetrics(quorumControllerMetrics). setCreateTopicPolicy(createTopicPolicy.asJava). setAlterConfigPolicy(alterConfigPolicy.asJava). - setConfigurationValidator(new ControllerConfigurationValidator()). + setConfigurationValidator(new ControllerConfigurationValidator(sharedServer.brokerConfig)). Review Comment: `sharedServer` represents the case when kraft controller is run on a broker and not as an independent node. In such case a broker has two responsibility, one to act as a controller and another to act as a broker. These two configs represent the configurations associated with node's role as a broker and as a controller. In our case here, when createTopic or alterConfig is called to enable TS for a topic, it will be forwarded to the controller. Controller will validate the config using `ControllerConfigurationValidator` before applying it. The assumption here is that the broker level configuration has already been validated before forwarding. Now, this is the first case where we want to make an assertion consisting of both broker level configuration and topic level configuration. I would ideally have wanted to fail fast with this at broker itself before it is sent to controller by adding the validation at https://github.com/apache/kafka/blob/f137da04fa71734d176e19f5800622f4b4dfdb66/core/src/main/scala/kafka/server/ConfigAdminManager.scala#L155 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #14176: KAFKA-15295: Add config validation when remote storage is enabled on a topic
divijvaidya commented on code in PR #14176: URL: https://github.com/apache/kafka/pull/14176#discussion_r1291265129 ## core/src/main/scala/kafka/server/ControllerServer.scala: ## @@ -231,7 +231,7 @@ class ControllerServer( setMetrics(quorumControllerMetrics). setCreateTopicPolicy(createTopicPolicy.asJava). setAlterConfigPolicy(alterConfigPolicy.asJava). - setConfigurationValidator(new ControllerConfigurationValidator()). + setConfigurationValidator(new ControllerConfigurationValidator(sharedServer.brokerConfig)). Review Comment: Thinking more about it...this is more complicated than I expected. We want to block enablement of a Topic level config if all broker don't have TS enabled on them. We need a way to determine that the TS has been enabled on all brokers. In Kraft world, no component has a view of all broker configs, not even the controller (correct me if I am wrong here) because broker level config is in their separate server.properties files. As an example, what happens when we are in a rolling restart, some brokers have TS enabled on them and some don't. We send an alter config call to enable TS for a topic, it hits the one which has TS enabled, this broker forwards it to the controller and controller will send the config update to all brokers. When another broker which doesn't have TS enabled gets this config change, it "should" fail to apply it. But failing now is too late since alterConfig has already succeeded since controller->broker config propagation is done async. With this limitation in mind, the ideal solution is: 1. add a logic in controller such that it knows broker level config of all brokers (does it already know that in metadata?) 2. when request to enable TS for a topic arrives, ensure that all brokers have TS enabled, if not, then reject. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #14176: KAFKA-15295: Add config validation when remote storage is enabled on a topic
divijvaidya commented on code in PR #14176: URL: https://github.com/apache/kafka/pull/14176#discussion_r1291234961 ## core/src/test/scala/integration/kafka/admin/RemoteTopicCreateAndUpdateConfigTest.scala: ## @@ -0,0 +1,274 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.admin + +import kafka.integration.KafkaServerTestHarness +import kafka.server.KafkaConfig +import kafka.utils.{Logging, TestInfoUtils, TestUtils} +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.clients.admin.{Admin, AlterConfigOp, ConfigEntry} +import org.apache.kafka.common.config.{ConfigResource, TopicConfig} +import org.apache.kafka.common.errors.InvalidConfigurationException +import org.apache.kafka.server.log.remote.storage.{NoOpRemoteLogMetadataManager, NoOpRemoteStorageManager, RemoteLogManagerConfig} +import org.junit.jupiter.api.Assertions._ +import org.junit.jupiter.api.function.Executable +import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo} +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.ValueSource + +import java.util +import java.util.{Collections, Properties} +import scala.collection.Seq +import scala.concurrent.ExecutionException +import scala.util.Random + +class RemoteTopicCreateAndUpdateConfigTest extends KafkaServerTestHarness with Logging { Review Comment: please extend IntegrationTestHarness and then use functions defined in it such as `createAdminClient` etc. This is beneficial because that harness takes care to clean up resources properly at the end of the test. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #14176: KAFKA-15295: Add config validation when remote storage is enabled on a topic
divijvaidya commented on code in PR #14176: URL: https://github.com/apache/kafka/pull/14176#discussion_r1291232121 ## core/src/main/scala/kafka/server/ControllerServer.scala: ## @@ -231,7 +231,7 @@ class ControllerServer( setMetrics(quorumControllerMetrics). setCreateTopicPolicy(createTopicPolicy.asJava). setAlterConfigPolicy(alterConfigPolicy.asJava). - setConfigurationValidator(new ControllerConfigurationValidator()). + setConfigurationValidator(new ControllerConfigurationValidator(sharedServer.brokerConfig)). Review Comment: `sharedServer` represents the case when kraft controller is run on a broker and not as an independent node. In such case a broker has two responsibility, one to act as a controller and another to act as a broker. These two configs represent the configurations associated with node's role as a broker and as a controller. In our case here, when createTopic or alterConfig is called to enable TS for a topic, it will be forwarded to the controller. Controller will validate the config using `ControllerConfigurationValidator` before applying it. The assumption here is that the broker level configuration has already been validated before forwarding. Now, this is the first case where we want to make an assertion consisting of both broker level configuration and topic level configuration. I would ideally have wanted to fail fast with this at broker itself before it is sent to controller by adding the validation at https://github.com/apache/kafka/blob/f137da04fa71734d176e19f5800622f4b4dfdb66/core/src/main/scala/kafka/server/ConfigAdminManager.scala#L155 Perhaps, you can make a change there? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #14176: KAFKA-15295: Add config validation when remote storage is enabled on a topic
divijvaidya commented on code in PR #14176: URL: https://github.com/apache/kafka/pull/14176#discussion_r1291222788 ## storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java: ## @@ -459,49 +463,53 @@ public static void validateValues(Map props) { long maxCompactionLag = (Long) props.get(TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG); if (minCompactionLag > maxCompactionLag) { throw new InvalidConfigurationException("conflict topic config setting " -+ TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG + " (" + minCompactionLag + ") > " -+ TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG + " (" + maxCompactionLag + ")"); ++ TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG + " (" + minCompactionLag + ") > " ++ TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG + " (" + maxCompactionLag + ")"); } +} -if (props.containsKey(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG)) { -boolean isRemoteStorageEnabled = (Boolean) props.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG); -String cleanupPolicy = props.get(TopicConfig.CLEANUP_POLICY_CONFIG).toString().toLowerCase(Locale.getDefault()); -if (isRemoteStorageEnabled && cleanupPolicy.contains(TopicConfig.CLEANUP_POLICY_COMPACT)) { -throw new ConfigException("Remote log storage is unsupported for the compacted topics"); -} +public static void validateValuesInBroker(Map props) { +validateValues(props); +Boolean isRemoteLogStorageSystemEnabled = +(Boolean) props.get(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP); +Boolean isRemoteStorageEnabled = (Boolean) props.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG); +if (!isRemoteLogStorageSystemEnabled && isRemoteStorageEnabled) { +throw new ConfigException("Tiered Storage functionality is disabled in the broker. " + +"Topic cannot be configured with remote log storage."); } -if (props.containsKey(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG)) { -Long retentionBytes = (Long) props.get(TopicConfig.RETENTION_BYTES_CONFIG); -Long localLogRetentionBytes = (Long) props.get(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG); -if (retentionBytes > -1 && localLogRetentionBytes != -2) { -if (localLogRetentionBytes == -1) { -String message = String.format("Value must not be -1 as %s value is set as %d.", -TopicConfig.RETENTION_BYTES_CONFIG, retentionBytes); -throw new ConfigException(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, localLogRetentionBytes, message); -} -if (localLogRetentionBytes > retentionBytes) { -String message = String.format("Value must not be more than %s property value: %d", -TopicConfig.RETENTION_BYTES_CONFIG, retentionBytes); -throw new ConfigException(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, localLogRetentionBytes, message); -} +String cleanupPolicy = props.get(TopicConfig.CLEANUP_POLICY_CONFIG).toString().toLowerCase(Locale.getDefault()); +if (isRemoteStorageEnabled && cleanupPolicy.contains(TopicConfig.CLEANUP_POLICY_COMPACT)) { +throw new ConfigException("Remote log storage is unsupported for the compacted topics"); +} + +Long retentionBytes = (Long) props.get(TopicConfig.RETENTION_BYTES_CONFIG); +Long localLogRetentionBytes = (Long) props.get(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG); +if (isRemoteStorageEnabled && retentionBytes > -1 && localLogRetentionBytes != -2) { +if (localLogRetentionBytes == -1) { +String message = String.format("Value must not be -1 as %s value is set as %d.", +TopicConfig.RETENTION_BYTES_CONFIG, retentionBytes); +throw new ConfigException(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, localLogRetentionBytes, message); +} +if (localLogRetentionBytes > retentionBytes) { +String message = String.format("Value must not be more than %s property value: %d", +TopicConfig.RETENTION_BYTES_CONFIG, retentionBytes); +throw new ConfigException(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, localLogRetentionBytes, message); } Review Comment: I should have been more clear. LogConfig.validateValues() is called on both client as well as server. It actually also called when we create LogManager e.g. at https://github.com/apache/kafka/blob/f137da04fa71734d176e19f5800622f4b4dfdb66/core/src/main/scala/kafka/log/LogManager.scala#L1379 My point in suggesting to add this in `validateValues()` was that i
[GitHub] [kafka] divijvaidya commented on a diff in pull request #14176: KAFKA-15295: Add config validation when remote storage is enabled on a topic
divijvaidya commented on code in PR #14176: URL: https://github.com/apache/kafka/pull/14176#discussion_r1291213288 ## storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java: ## @@ -459,49 +463,53 @@ public static void validateValues(Map props) { long maxCompactionLag = (Long) props.get(TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG); if (minCompactionLag > maxCompactionLag) { throw new InvalidConfigurationException("conflict topic config setting " -+ TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG + " (" + minCompactionLag + ") > " -+ TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG + " (" + maxCompactionLag + ")"); ++ TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG + " (" + minCompactionLag + ") > " ++ TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG + " (" + maxCompactionLag + ")"); } +} -if (props.containsKey(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG)) { -boolean isRemoteStorageEnabled = (Boolean) props.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG); -String cleanupPolicy = props.get(TopicConfig.CLEANUP_POLICY_CONFIG).toString().toLowerCase(Locale.getDefault()); -if (isRemoteStorageEnabled && cleanupPolicy.contains(TopicConfig.CLEANUP_POLICY_COMPACT)) { -throw new ConfigException("Remote log storage is unsupported for the compacted topics"); -} +public static void validateValuesInBroker(Map props) { +validateValues(props); +Boolean isRemoteLogStorageSystemEnabled = +(Boolean) props.get(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP); +Boolean isRemoteStorageEnabled = (Boolean) props.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG); +if (!isRemoteLogStorageSystemEnabled && isRemoteStorageEnabled) { +throw new ConfigException("Tiered Storage functionality is disabled in the broker. " + +"Topic cannot be configured with remote log storage."); } -if (props.containsKey(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG)) { -Long retentionBytes = (Long) props.get(TopicConfig.RETENTION_BYTES_CONFIG); -Long localLogRetentionBytes = (Long) props.get(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG); -if (retentionBytes > -1 && localLogRetentionBytes != -2) { -if (localLogRetentionBytes == -1) { -String message = String.format("Value must not be -1 as %s value is set as %d.", -TopicConfig.RETENTION_BYTES_CONFIG, retentionBytes); -throw new ConfigException(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, localLogRetentionBytes, message); -} -if (localLogRetentionBytes > retentionBytes) { -String message = String.format("Value must not be more than %s property value: %d", -TopicConfig.RETENTION_BYTES_CONFIG, retentionBytes); -throw new ConfigException(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, localLogRetentionBytes, message); -} +String cleanupPolicy = props.get(TopicConfig.CLEANUP_POLICY_CONFIG).toString().toLowerCase(Locale.getDefault()); +if (isRemoteStorageEnabled && cleanupPolicy.contains(TopicConfig.CLEANUP_POLICY_COMPACT)) { +throw new ConfigException("Remote log storage is unsupported for the compacted topics"); +} Review Comment: @kamalcph are we planning to work on this comment in this PR? I would prefer that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #14176: KAFKA-15295: Add config validation when remote storage is enabled on a topic
divijvaidya commented on code in PR #14176: URL: https://github.com/apache/kafka/pull/14176#discussion_r1288894765 ## storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java: ## @@ -459,49 +463,53 @@ public static void validateValues(Map props) { long maxCompactionLag = (Long) props.get(TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG); if (minCompactionLag > maxCompactionLag) { throw new InvalidConfigurationException("conflict topic config setting " -+ TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG + " (" + minCompactionLag + ") > " -+ TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG + " (" + maxCompactionLag + ")"); ++ TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG + " (" + minCompactionLag + ") > " ++ TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG + " (" + maxCompactionLag + ")"); } +} -if (props.containsKey(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG)) { -boolean isRemoteStorageEnabled = (Boolean) props.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG); -String cleanupPolicy = props.get(TopicConfig.CLEANUP_POLICY_CONFIG).toString().toLowerCase(Locale.getDefault()); -if (isRemoteStorageEnabled && cleanupPolicy.contains(TopicConfig.CLEANUP_POLICY_COMPACT)) { -throw new ConfigException("Remote log storage is unsupported for the compacted topics"); -} +public static void validateValuesInBroker(Map props) { +validateValues(props); +Boolean isRemoteLogStorageSystemEnabled = +(Boolean) props.get(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP); +Boolean isRemoteStorageEnabled = (Boolean) props.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG); +if (!isRemoteLogStorageSystemEnabled && isRemoteStorageEnabled) { +throw new ConfigException("Tiered Storage functionality is disabled in the broker. " + +"Topic cannot be configured with remote log storage."); } -if (props.containsKey(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG)) { -Long retentionBytes = (Long) props.get(TopicConfig.RETENTION_BYTES_CONFIG); -Long localLogRetentionBytes = (Long) props.get(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG); -if (retentionBytes > -1 && localLogRetentionBytes != -2) { -if (localLogRetentionBytes == -1) { -String message = String.format("Value must not be -1 as %s value is set as %d.", -TopicConfig.RETENTION_BYTES_CONFIG, retentionBytes); -throw new ConfigException(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, localLogRetentionBytes, message); -} -if (localLogRetentionBytes > retentionBytes) { -String message = String.format("Value must not be more than %s property value: %d", -TopicConfig.RETENTION_BYTES_CONFIG, retentionBytes); -throw new ConfigException(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, localLogRetentionBytes, message); -} +String cleanupPolicy = props.get(TopicConfig.CLEANUP_POLICY_CONFIG).toString().toLowerCase(Locale.getDefault()); +if (isRemoteStorageEnabled && cleanupPolicy.contains(TopicConfig.CLEANUP_POLICY_COMPACT)) { +throw new ConfigException("Remote log storage is unsupported for the compacted topics"); +} Review Comment: could be a separate function - `validateNoRemoteStorageForCompactedTopic` ## storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java: ## @@ -459,49 +463,53 @@ public static void validateValues(Map props) { long maxCompactionLag = (Long) props.get(TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG); if (minCompactionLag > maxCompactionLag) { throw new InvalidConfigurationException("conflict topic config setting " -+ TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG + " (" + minCompactionLag + ") > " -+ TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG + " (" + maxCompactionLag + ")"); ++ TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG + " (" + minCompactionLag + ") > " ++ TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG + " (" + maxCompactionLag + ")"); } +} -if (props.containsKey(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG)) { -boolean isRemoteStorageEnabled = (Boolean) props.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG); -String cleanupPolicy = props.get(TopicConfig.CLEANUP_POLICY_CONFIG).toString().toLowerCase(Locale.getDefault()); -if (isRemoteStorageEnabled && cleanupPolicy.contains(TopicConfig.CLEANUP_POLICY_COMPACT)) { -throw new ConfigException("Remote log storage is unsupported for the c