[GitHub] [kafka] divijvaidya commented on a diff in pull request #14176: KAFKA-15295: Add config validation when remote storage is enabled on a topic

2023-08-15 Thread via GitHub


divijvaidya commented on code in PR #14176:
URL: https://github.com/apache/kafka/pull/14176#discussion_r1294874991


##
core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala:
##
@@ -0,0 +1,331 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.admin
+
+import kafka.api.IntegrationTestHarness
+import kafka.server.KafkaConfig
+import kafka.utils.{TestInfoUtils, TestUtils}
+import org.apache.kafka.clients.admin.{AlterConfigOp, ConfigEntry}
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
+import org.apache.kafka.common.errors.InvalidConfigurationException
+import 
org.apache.kafka.server.log.remote.storage.{NoOpRemoteLogMetadataManager, 
NoOpRemoteStorageManager, RemoteLogManagerConfig}
+import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.function.Executable
+import org.junit.jupiter.api.{BeforeEach, Tag, TestInfo}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
+
+import java.util
+import java.util.{Collections, Properties}
+import scala.collection.Seq
+import scala.concurrent.ExecutionException
+import scala.util.Random
+
+@Tag("integration")
+class RemoteTopicCrudTest extends IntegrationTestHarness {
+
+  val numPartitions = 2
+  val numReplicationFactor = 2
+  var testTopicName: String = _
+  var sysRemoteStorageEnabled = true
+
+  override protected def brokerCount: Int = 2
+
+  override protected def modifyConfigs(props: Seq[Properties]): Unit = {
+props.foreach(p => p.putAll(overrideProps()))
+  }
+
+  override protected def kraftControllerConfigs(): Seq[Properties] = {
+Seq(overrideProps())
+  }
+
+  @BeforeEach
+  override def setUp(info: TestInfo): Unit = {
+if 
(info.getTestMethod.get().getName.endsWith("SystemRemoteStorageIsDisabled")) {
+  sysRemoteStorageEnabled = false
+}
+super.setUp(info)
+testTopicName = 
s"${info.getTestMethod.get().getName}-${Random.alphanumeric.take(10).mkString}"
+  }
+
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testCreateRemoteTopicWithValidRetentionTime(quorum: String): Unit = {
+val topicConfig = new Properties()
+topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
+topicConfig.put(TopicConfig.RETENTION_MS_CONFIG, "200")
+topicConfig.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, "100")
+TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, 
brokers, numPartitions, numReplicationFactor,
+  topicConfig = topicConfig)
+verifyRemoteLogTopicConfigs(topicConfig)
+  }
+
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testCreateRemoteTopicWithValidRetentionSize(quorum: String): Unit = {
+val topicConfig = new Properties()
+topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
+topicConfig.put(TopicConfig.RETENTION_BYTES_CONFIG, "512")
+topicConfig.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, "256")
+TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, 
brokers, numPartitions, numReplicationFactor,
+  topicConfig = topicConfig)
+verifyRemoteLogTopicConfigs(topicConfig)
+  }
+
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testCreateRemoteTopicWithInheritedLocalRetentionTime(quorum: String): 
Unit = {
+// inherited local retention ms is 1000
+val topicConfig = new Properties()
+topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
+topicConfig.put(TopicConfig.RETENTION_MS_CONFIG, "1001")
+TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, 
brokers, numPartitions, numReplicationFactor,
+  topicConfig = topicConfig)
+verifyRemoteLogTopicConfigs(topicConfig)
+  }
+
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testCreateRemoteTopicWithInheritedLocalRetentionSize(quorum: String): 
Unit = {
+//

[GitHub] [kafka] divijvaidya commented on a diff in pull request #14176: KAFKA-15295: Add config validation when remote storage is enabled on a topic

2023-08-15 Thread via GitHub


divijvaidya commented on code in PR #14176:
URL: https://github.com/apache/kafka/pull/14176#discussion_r1294846904


##
core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala:
##
@@ -0,0 +1,331 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.admin
+
+import kafka.api.IntegrationTestHarness
+import kafka.server.KafkaConfig
+import kafka.utils.{TestInfoUtils, TestUtils}
+import org.apache.kafka.clients.admin.{AlterConfigOp, ConfigEntry}
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
+import org.apache.kafka.common.errors.InvalidConfigurationException
+import 
org.apache.kafka.server.log.remote.storage.{NoOpRemoteLogMetadataManager, 
NoOpRemoteStorageManager, RemoteLogManagerConfig}
+import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.function.Executable
+import org.junit.jupiter.api.{BeforeEach, Tag, TestInfo}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
+
+import java.util
+import java.util.{Collections, Properties}
+import scala.collection.Seq
+import scala.concurrent.ExecutionException
+import scala.util.Random
+
+@Tag("integration")
+class RemoteTopicCrudTest extends IntegrationTestHarness {
+
+  val numPartitions = 2
+  val numReplicationFactor = 2
+  var testTopicName: String = _
+  var sysRemoteStorageEnabled = true
+
+  override protected def brokerCount: Int = 2
+
+  override protected def modifyConfigs(props: Seq[Properties]): Unit = {
+props.foreach(p => p.putAll(overrideProps()))
+  }
+
+  override protected def kraftControllerConfigs(): Seq[Properties] = {
+Seq(overrideProps())
+  }
+
+  @BeforeEach
+  override def setUp(info: TestInfo): Unit = {
+if 
(info.getTestMethod.get().getName.endsWith("SystemRemoteStorageIsDisabled")) {
+  sysRemoteStorageEnabled = false
+}
+super.setUp(info)
+testTopicName = 
s"${info.getTestMethod.get().getName}-${Random.alphanumeric.take(10).mkString}"
+  }
+
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testCreateRemoteTopicWithValidRetentionTime(quorum: String): Unit = {
+val topicConfig = new Properties()
+topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
+topicConfig.put(TopicConfig.RETENTION_MS_CONFIG, "200")
+topicConfig.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, "100")
+TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, 
brokers, numPartitions, numReplicationFactor,
+  topicConfig = topicConfig)
+verifyRemoteLogTopicConfigs(topicConfig)
+  }
+
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testCreateRemoteTopicWithValidRetentionSize(quorum: String): Unit = {
+val topicConfig = new Properties()
+topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
+topicConfig.put(TopicConfig.RETENTION_BYTES_CONFIG, "512")
+topicConfig.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, "256")
+TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, 
brokers, numPartitions, numReplicationFactor,
+  topicConfig = topicConfig)
+verifyRemoteLogTopicConfigs(topicConfig)
+  }
+
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testCreateRemoteTopicWithInheritedLocalRetentionTime(quorum: String): 
Unit = {
+// inherited local retention ms is 1000
+val topicConfig = new Properties()
+topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
+topicConfig.put(TopicConfig.RETENTION_MS_CONFIG, "1001")
+TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, 
brokers, numPartitions, numReplicationFactor,
+  topicConfig = topicConfig)
+verifyRemoteLogTopicConfigs(topicConfig)
+  }
+
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testCreateRemoteTopicWithInheritedLocalRetentionSize(quorum: String): 
Unit = {
+//

[GitHub] [kafka] divijvaidya commented on a diff in pull request #14176: KAFKA-15295: Add config validation when remote storage is enabled on a topic

2023-08-15 Thread via GitHub


divijvaidya commented on code in PR #14176:
URL: https://github.com/apache/kafka/pull/14176#discussion_r1294410814


##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java:
##
@@ -265,7 +266,12 @@ public Optional serverConfigName(String 
configName) {
 .define(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, LONG, 
DEFAULT_LOCAL_RETENTION_MS, atLeast(-2), MEDIUM,
 TopicConfig.LOCAL_LOG_RETENTION_MS_DOC)
 .define(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, LONG, 
DEFAULT_LOCAL_RETENTION_BYTES, atLeast(-2), MEDIUM,
-TopicConfig.LOCAL_LOG_RETENTION_BYTES_DOC);
+TopicConfig.LOCAL_LOG_RETENTION_BYTES_DOC)
+// RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP is 
defined here to ensure that when system
+// level remote storage functionality is disabled, topics cannot 
be configured to use remote storage.
+
.defineInternal(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, 
BOOLEAN,
+
RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_STORAGE_SYSTEM_ENABLE, null, MEDIUM,
+
RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_DOC);

Review Comment:
   I disagree that we just pass one specific config instead of generic 
kafkaConfig object, but in the interest of wrapping up this PR since it has 
undergone multiple revision, I am ok with this approach. We can always come 
back and refactor later.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #14176: KAFKA-15295: Add config validation when remote storage is enabled on a topic

2023-08-14 Thread via GitHub


divijvaidya commented on code in PR #14176:
URL: https://github.com/apache/kafka/pull/14176#discussion_r1293349126


##
core/src/main/scala/kafka/server/ControllerServer.scala:
##
@@ -231,7 +231,7 @@ class ControllerServer(
   setMetrics(quorumControllerMetrics).
   setCreateTopicPolicy(createTopicPolicy.asJava).
   setAlterConfigPolicy(alterConfigPolicy.asJava).
-  setConfigurationValidator(new ControllerConfigurationValidator()).
+  setConfigurationValidator(new 
ControllerConfigurationValidator(sharedServer.brokerConfig)).

Review Comment:
   Nevertheless, we can discuss it in separate JIRA that I created above.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #14176: KAFKA-15295: Add config validation when remote storage is enabled on a topic

2023-08-14 Thread via GitHub


divijvaidya commented on code in PR #14176:
URL: https://github.com/apache/kafka/pull/14176#discussion_r1293348162


##
core/src/main/scala/kafka/server/ControllerServer.scala:
##
@@ -231,7 +231,7 @@ class ControllerServer(
   setMetrics(quorumControllerMetrics).
   setCreateTopicPolicy(createTopicPolicy.asJava).
   setAlterConfigPolicy(alterConfigPolicy.asJava).
-  setConfigurationValidator(new ControllerConfigurationValidator()).
+  setConfigurationValidator(new 
ControllerConfigurationValidator(sharedServer.brokerConfig)).

Review Comment:
   Nope, I would disagree that the scenario is complicated. For larger clusters 
containing hundreds of nodes, rolling restart can take a long time. Any 
functionality that we introduce in Kafka code base should be able to handle 
scenarios where some brokers have features enabled and others don't. In 
existing code base this is achieved by using the "features" [1]. When a broker 
sends metadata to the controller, it will also send "features" that it 
supports. In our situation, we need to add TS as a "feature". So during rolling 
restart, controller knows that not all brokers have the correct feature and 
will reject any call to enable TS for a topic. After rolling restart is 
complete, controller will know that all brokers have TS feature on them, hence, 
it can start enabling TS for topic.
   
   [1] 
https://github.com/apache/kafka/blob/43751d8d0521b1440a823a9430fdb0659ce7c436/core/src/main/scala/kafka/server/BrokerFeatures.scala#L33



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #14176: KAFKA-15295: Add config validation when remote storage is enabled on a topic

2023-08-14 Thread via GitHub


divijvaidya commented on code in PR #14176:
URL: https://github.com/apache/kafka/pull/14176#discussion_r1293181861


##
core/src/main/scala/kafka/server/ControllerServer.scala:
##
@@ -231,7 +231,7 @@ class ControllerServer(
   setMetrics(quorumControllerMetrics).
   setCreateTopicPolicy(createTopicPolicy.asJava).
   setAlterConfigPolicy(alterConfigPolicy.asJava).
-  setConfigurationValidator(new ControllerConfigurationValidator()).
+  setConfigurationValidator(new 
ControllerConfigurationValidator(sharedServer.brokerConfig)).

Review Comment:
   I created a JIRA for the scenario mentioned above. We can consider it 
separately. https://issues.apache.org/jira/browse/KAFKA-15341



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #14176: KAFKA-15295: Add config validation when remote storage is enabled on a topic

2023-08-14 Thread via GitHub


divijvaidya commented on code in PR #14176:
URL: https://github.com/apache/kafka/pull/14176#discussion_r1293158075


##
core/src/test/scala/integration/kafka/admin/RemoteTopicCRUDTest.scala:
##
@@ -0,0 +1,262 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.admin
+
+import kafka.api.IntegrationTestHarness
+import kafka.server.KafkaConfig
+import kafka.utils.{TestInfoUtils, TestUtils}
+import org.apache.kafka.clients.admin.{AlterConfigOp, ConfigEntry}
+import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
+import org.apache.kafka.common.errors.InvalidConfigurationException
+import 
org.apache.kafka.server.log.remote.storage.{NoOpRemoteLogMetadataManager, 
NoOpRemoteStorageManager, RemoteLogManagerConfig}
+import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.function.Executable
+import org.junit.jupiter.api.{BeforeEach, Tag, TestInfo}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
+
+import java.util
+import java.util.{Collections, Properties}
+import scala.collection.Seq
+import scala.concurrent.ExecutionException
+import scala.util.Random
+
+@Tag("integration")
+class RemoteTopicCRUDTest extends IntegrationTestHarness {
+
+  val numPartitions = 2
+  val numReplicationFactor = 2
+  var testTopicName: String = _
+
+  override protected def brokerCount: Int = 2
+
+  override protected def modifyConfigs(props: Seq[Properties]): Unit = {
+props.foreach(p => p.putAll(overrideProps()))
+  }
+
+  override protected def kraftControllerConfigs(): Seq[Properties] = {
+Seq(overrideProps())
+  }
+
+  @BeforeEach
+  override def setUp(info: TestInfo): Unit = {
+super.setUp(info)
+testTopicName = 
s"${info.getTestMethod.get().getName}-${Random.alphanumeric.take(10).mkString}"
+  }
+
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testCreateRemoteTopicWithValidRetentionTime(quorum: String): Unit = {
+val topicConfig = new Properties()
+topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
+topicConfig.put(TopicConfig.RETENTION_MS_CONFIG, "200")
+topicConfig.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, "100")
+TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, 
brokers, numPartitions, numReplicationFactor,
+  topicConfig = topicConfig)
+  }
+
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testCreateRemoteTopicWithValidRetentionSize(quorum: String): Unit = {
+val topicConfig = new Properties()
+topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
+topicConfig.put(TopicConfig.RETENTION_BYTES_CONFIG, "512")
+topicConfig.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, "256")
+TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, 
brokers, numPartitions, numReplicationFactor,
+  topicConfig = topicConfig)
+  }
+
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testCreateRemoteTopicWithInheritedLocalRetentionTime(quorum: String): 
Unit = {
+// inherited local retention ms is 1000
+val topicConfig = new Properties()
+topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
+topicConfig.put(TopicConfig.RETENTION_MS_CONFIG, "1001")
+TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, 
brokers, numPartitions, numReplicationFactor,
+  topicConfig = topicConfig)
+  }
+
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testCreateRemoteTopicWithInheritedLocalRetentionSize(quorum: String): 
Unit = {
+// inherited local retention bytes is 1024
+val topicConfig = new Properties()
+topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
+topicConfig.put(TopicConfig.RETENTION_BYTES_CONFIG, "1025")
+TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, 
brokers, numPartitions, numReplicationFactor,
+  topicConfi

[GitHub] [kafka] divijvaidya commented on a diff in pull request #14176: KAFKA-15295: Add config validation when remote storage is enabled on a topic

2023-08-11 Thread via GitHub


divijvaidya commented on code in PR #14176:
URL: https://github.com/apache/kafka/pull/14176#discussion_r1291232121


##
core/src/main/scala/kafka/server/ControllerServer.scala:
##
@@ -231,7 +231,7 @@ class ControllerServer(
   setMetrics(quorumControllerMetrics).
   setCreateTopicPolicy(createTopicPolicy.asJava).
   setAlterConfigPolicy(alterConfigPolicy.asJava).
-  setConfigurationValidator(new ControllerConfigurationValidator()).
+  setConfigurationValidator(new 
ControllerConfigurationValidator(sharedServer.brokerConfig)).

Review Comment:
   `sharedServer` represents the case when kraft controller is run on a broker 
and not as an independent node. In such case a broker has two responsibility, 
one to act as a controller and another to act as a broker. These two configs 
represent the configurations associated with node's role as a broker and as a 
controller. 
   
   In our case here, when createTopic or alterConfig is called to enable TS for 
a topic, it will be forwarded to the controller. Controller will validate the 
config using `ControllerConfigurationValidator` before applying it. The 
assumption here is that the broker level configuration has already been 
validated before forwarding. Now, this is the first case where we want to make 
an assertion consisting of both broker level configuration and topic level 
configuration. I would ideally have wanted to fail fast with this at broker 
itself before it is sent to controller by adding the validation at 
https://github.com/apache/kafka/blob/f137da04fa71734d176e19f5800622f4b4dfdb66/core/src/main/scala/kafka/server/ConfigAdminManager.scala#L155
 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #14176: KAFKA-15295: Add config validation when remote storage is enabled on a topic

2023-08-11 Thread via GitHub


divijvaidya commented on code in PR #14176:
URL: https://github.com/apache/kafka/pull/14176#discussion_r1291265129


##
core/src/main/scala/kafka/server/ControllerServer.scala:
##
@@ -231,7 +231,7 @@ class ControllerServer(
   setMetrics(quorumControllerMetrics).
   setCreateTopicPolicy(createTopicPolicy.asJava).
   setAlterConfigPolicy(alterConfigPolicy.asJava).
-  setConfigurationValidator(new ControllerConfigurationValidator()).
+  setConfigurationValidator(new 
ControllerConfigurationValidator(sharedServer.brokerConfig)).

Review Comment:
   Thinking more about it...this is more complicated than I expected.
   
   We want to block enablement of a Topic level config if all broker don't have 
TS enabled on them. We need a way to determine that the TS has been enabled on 
all brokers. In Kraft world, no component has a view of all broker configs, not 
even the controller (correct me if I am wrong here) because broker level config 
is in their separate server.properties files.
   
   As an example, what happens when we are in a rolling restart, some brokers 
have TS enabled on them and some don't. We send an alter config call to enable 
TS for a topic, it hits the one which has TS enabled, this broker forwards it 
to the controller and controller will send the config update to all brokers. 
When another broker which doesn't have TS enabled gets this config change, it 
"should" fail to apply it. But failing now is too late since alterConfig has 
already succeeded since controller->broker config propagation is done async.
   
   With this limitation in mind, the ideal solution is:
   1. add a logic in controller such that it knows broker level config of all 
brokers (does it already know that in metadata?)
   2. when request to enable TS for a topic arrives, ensure that all brokers 
have TS enabled, if not, then reject.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #14176: KAFKA-15295: Add config validation when remote storage is enabled on a topic

2023-08-11 Thread via GitHub


divijvaidya commented on code in PR #14176:
URL: https://github.com/apache/kafka/pull/14176#discussion_r1291234961


##
core/src/test/scala/integration/kafka/admin/RemoteTopicCreateAndUpdateConfigTest.scala:
##
@@ -0,0 +1,274 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.admin
+
+import kafka.integration.KafkaServerTestHarness
+import kafka.server.KafkaConfig
+import kafka.utils.{Logging, TestInfoUtils, TestUtils}
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.admin.{Admin, AlterConfigOp, ConfigEntry}
+import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
+import org.apache.kafka.common.errors.InvalidConfigurationException
+import 
org.apache.kafka.server.log.remote.storage.{NoOpRemoteLogMetadataManager, 
NoOpRemoteStorageManager, RemoteLogManagerConfig}
+import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.function.Executable
+import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
+
+import java.util
+import java.util.{Collections, Properties}
+import scala.collection.Seq
+import scala.concurrent.ExecutionException
+import scala.util.Random
+
+class RemoteTopicCreateAndUpdateConfigTest extends KafkaServerTestHarness with 
Logging {

Review Comment:
   please extend IntegrationTestHarness and then use functions defined in it 
such as `createAdminClient` etc. This is beneficial because that harness takes 
care to clean up resources properly at the end of the test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #14176: KAFKA-15295: Add config validation when remote storage is enabled on a topic

2023-08-11 Thread via GitHub


divijvaidya commented on code in PR #14176:
URL: https://github.com/apache/kafka/pull/14176#discussion_r1291232121


##
core/src/main/scala/kafka/server/ControllerServer.scala:
##
@@ -231,7 +231,7 @@ class ControllerServer(
   setMetrics(quorumControllerMetrics).
   setCreateTopicPolicy(createTopicPolicy.asJava).
   setAlterConfigPolicy(alterConfigPolicy.asJava).
-  setConfigurationValidator(new ControllerConfigurationValidator()).
+  setConfigurationValidator(new 
ControllerConfigurationValidator(sharedServer.brokerConfig)).

Review Comment:
   `sharedServer` represents the case when kraft controller is run on a broker 
and not as an independent node. In such case a broker has two responsibility, 
one to act as a controller and another to act as a broker. These two configs 
represent the configurations associated with node's role as a broker and as a 
controller. 
   
   In our case here, when createTopic or alterConfig is called to enable TS for 
a topic, it will be forwarded to the controller. Controller will validate the 
config using `ControllerConfigurationValidator` before applying it. The 
assumption here is that the broker level configuration has already been 
validated before forwarding. Now, this is the first case where we want to make 
an assertion consisting of both broker level configuration and topic level 
configuration. I would ideally have wanted to fail fast with this at broker 
itself before it is sent to controller by adding the validation at 
https://github.com/apache/kafka/blob/f137da04fa71734d176e19f5800622f4b4dfdb66/core/src/main/scala/kafka/server/ConfigAdminManager.scala#L155
 Perhaps, you can make a change there?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #14176: KAFKA-15295: Add config validation when remote storage is enabled on a topic

2023-08-11 Thread via GitHub


divijvaidya commented on code in PR #14176:
URL: https://github.com/apache/kafka/pull/14176#discussion_r1291222788


##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java:
##
@@ -459,49 +463,53 @@ public static void validateValues(Map props) {
 long maxCompactionLag = (Long) 
props.get(TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG);
 if (minCompactionLag > maxCompactionLag) {
 throw new InvalidConfigurationException("conflict topic config 
setting "
-+ TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG + " (" + 
minCompactionLag + ") > "
-+ TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG + " (" + 
maxCompactionLag + ")");
++ TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG + " (" + 
minCompactionLag + ") > "
++ TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG + " (" + 
maxCompactionLag + ")");
 }
+}
 
-if (props.containsKey(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG)) {
-boolean isRemoteStorageEnabled = (Boolean) 
props.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG);
-String cleanupPolicy = 
props.get(TopicConfig.CLEANUP_POLICY_CONFIG).toString().toLowerCase(Locale.getDefault());
-if (isRemoteStorageEnabled && 
cleanupPolicy.contains(TopicConfig.CLEANUP_POLICY_COMPACT)) {
-throw new ConfigException("Remote log storage is unsupported 
for the compacted topics");
-}
+public static void validateValuesInBroker(Map props) {
+validateValues(props);
+Boolean isRemoteLogStorageSystemEnabled =
+(Boolean) 
props.get(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP);
+Boolean isRemoteStorageEnabled = (Boolean) 
props.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG);
+if (!isRemoteLogStorageSystemEnabled && isRemoteStorageEnabled) {
+throw new ConfigException("Tiered Storage functionality is 
disabled in the broker. " +
+"Topic cannot be configured with remote log storage.");
 }
 
-if (props.containsKey(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG)) {
-Long retentionBytes = (Long) 
props.get(TopicConfig.RETENTION_BYTES_CONFIG);
-Long localLogRetentionBytes = (Long) 
props.get(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG);
-if (retentionBytes > -1 && localLogRetentionBytes != -2) {
-if (localLogRetentionBytes == -1) {
-String message = String.format("Value must not be -1 as %s 
value is set as %d.",
-TopicConfig.RETENTION_BYTES_CONFIG, 
retentionBytes);
-throw new 
ConfigException(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, 
localLogRetentionBytes, message);
-}
-if (localLogRetentionBytes > retentionBytes) {
-String message = String.format("Value must not be more 
than %s property value: %d",
-TopicConfig.RETENTION_BYTES_CONFIG, 
retentionBytes);
-throw new 
ConfigException(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, 
localLogRetentionBytes, message);
-}
+String cleanupPolicy = 
props.get(TopicConfig.CLEANUP_POLICY_CONFIG).toString().toLowerCase(Locale.getDefault());
+if (isRemoteStorageEnabled && 
cleanupPolicy.contains(TopicConfig.CLEANUP_POLICY_COMPACT)) {
+throw new ConfigException("Remote log storage is unsupported for 
the compacted topics");
+}
+
+Long retentionBytes = (Long) 
props.get(TopicConfig.RETENTION_BYTES_CONFIG);
+Long localLogRetentionBytes = (Long) 
props.get(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG);
+if (isRemoteStorageEnabled && retentionBytes > -1 && 
localLogRetentionBytes != -2) {
+if (localLogRetentionBytes == -1) {
+String message = String.format("Value must not be -1 as %s 
value is set as %d.",
+TopicConfig.RETENTION_BYTES_CONFIG, retentionBytes);
+throw new 
ConfigException(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, 
localLogRetentionBytes, message);
+}
+if (localLogRetentionBytes > retentionBytes) {
+String message = String.format("Value must not be more than %s 
property value: %d",
+TopicConfig.RETENTION_BYTES_CONFIG, retentionBytes);
+throw new 
ConfigException(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, 
localLogRetentionBytes, message);
 }

Review Comment:
   I should have been more clear. LogConfig.validateValues() is called on both 
client as well as server. It actually also called when we create LogManager 
e.g. at 
https://github.com/apache/kafka/blob/f137da04fa71734d176e19f5800622f4b4dfdb66/core/src/main/scala/kafka/log/LogManager.scala#L1379
 
   
   My point in suggesting to add this in `validateValues()` was that i

[GitHub] [kafka] divijvaidya commented on a diff in pull request #14176: KAFKA-15295: Add config validation when remote storage is enabled on a topic

2023-08-11 Thread via GitHub


divijvaidya commented on code in PR #14176:
URL: https://github.com/apache/kafka/pull/14176#discussion_r1291213288


##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java:
##
@@ -459,49 +463,53 @@ public static void validateValues(Map props) {
 long maxCompactionLag = (Long) 
props.get(TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG);
 if (minCompactionLag > maxCompactionLag) {
 throw new InvalidConfigurationException("conflict topic config 
setting "
-+ TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG + " (" + 
minCompactionLag + ") > "
-+ TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG + " (" + 
maxCompactionLag + ")");
++ TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG + " (" + 
minCompactionLag + ") > "
++ TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG + " (" + 
maxCompactionLag + ")");
 }
+}
 
-if (props.containsKey(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG)) {
-boolean isRemoteStorageEnabled = (Boolean) 
props.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG);
-String cleanupPolicy = 
props.get(TopicConfig.CLEANUP_POLICY_CONFIG).toString().toLowerCase(Locale.getDefault());
-if (isRemoteStorageEnabled && 
cleanupPolicy.contains(TopicConfig.CLEANUP_POLICY_COMPACT)) {
-throw new ConfigException("Remote log storage is unsupported 
for the compacted topics");
-}
+public static void validateValuesInBroker(Map props) {
+validateValues(props);
+Boolean isRemoteLogStorageSystemEnabled =
+(Boolean) 
props.get(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP);
+Boolean isRemoteStorageEnabled = (Boolean) 
props.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG);
+if (!isRemoteLogStorageSystemEnabled && isRemoteStorageEnabled) {
+throw new ConfigException("Tiered Storage functionality is 
disabled in the broker. " +
+"Topic cannot be configured with remote log storage.");
 }
 
-if (props.containsKey(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG)) {
-Long retentionBytes = (Long) 
props.get(TopicConfig.RETENTION_BYTES_CONFIG);
-Long localLogRetentionBytes = (Long) 
props.get(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG);
-if (retentionBytes > -1 && localLogRetentionBytes != -2) {
-if (localLogRetentionBytes == -1) {
-String message = String.format("Value must not be -1 as %s 
value is set as %d.",
-TopicConfig.RETENTION_BYTES_CONFIG, 
retentionBytes);
-throw new 
ConfigException(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, 
localLogRetentionBytes, message);
-}
-if (localLogRetentionBytes > retentionBytes) {
-String message = String.format("Value must not be more 
than %s property value: %d",
-TopicConfig.RETENTION_BYTES_CONFIG, 
retentionBytes);
-throw new 
ConfigException(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, 
localLogRetentionBytes, message);
-}
+String cleanupPolicy = 
props.get(TopicConfig.CLEANUP_POLICY_CONFIG).toString().toLowerCase(Locale.getDefault());
+if (isRemoteStorageEnabled && 
cleanupPolicy.contains(TopicConfig.CLEANUP_POLICY_COMPACT)) {
+throw new ConfigException("Remote log storage is unsupported for 
the compacted topics");
+}

Review Comment:
   @kamalcph are we planning to work on this comment in this PR? I would prefer 
that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #14176: KAFKA-15295: Add config validation when remote storage is enabled on a topic

2023-08-09 Thread via GitHub


divijvaidya commented on code in PR #14176:
URL: https://github.com/apache/kafka/pull/14176#discussion_r1288894765


##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java:
##
@@ -459,49 +463,53 @@ public static void validateValues(Map props) {
 long maxCompactionLag = (Long) 
props.get(TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG);
 if (minCompactionLag > maxCompactionLag) {
 throw new InvalidConfigurationException("conflict topic config 
setting "
-+ TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG + " (" + 
minCompactionLag + ") > "
-+ TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG + " (" + 
maxCompactionLag + ")");
++ TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG + " (" + 
minCompactionLag + ") > "
++ TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG + " (" + 
maxCompactionLag + ")");
 }
+}
 
-if (props.containsKey(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG)) {
-boolean isRemoteStorageEnabled = (Boolean) 
props.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG);
-String cleanupPolicy = 
props.get(TopicConfig.CLEANUP_POLICY_CONFIG).toString().toLowerCase(Locale.getDefault());
-if (isRemoteStorageEnabled && 
cleanupPolicy.contains(TopicConfig.CLEANUP_POLICY_COMPACT)) {
-throw new ConfigException("Remote log storage is unsupported 
for the compacted topics");
-}
+public static void validateValuesInBroker(Map props) {
+validateValues(props);
+Boolean isRemoteLogStorageSystemEnabled =
+(Boolean) 
props.get(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP);
+Boolean isRemoteStorageEnabled = (Boolean) 
props.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG);
+if (!isRemoteLogStorageSystemEnabled && isRemoteStorageEnabled) {
+throw new ConfigException("Tiered Storage functionality is 
disabled in the broker. " +
+"Topic cannot be configured with remote log storage.");
 }
 
-if (props.containsKey(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG)) {
-Long retentionBytes = (Long) 
props.get(TopicConfig.RETENTION_BYTES_CONFIG);
-Long localLogRetentionBytes = (Long) 
props.get(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG);
-if (retentionBytes > -1 && localLogRetentionBytes != -2) {
-if (localLogRetentionBytes == -1) {
-String message = String.format("Value must not be -1 as %s 
value is set as %d.",
-TopicConfig.RETENTION_BYTES_CONFIG, 
retentionBytes);
-throw new 
ConfigException(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, 
localLogRetentionBytes, message);
-}
-if (localLogRetentionBytes > retentionBytes) {
-String message = String.format("Value must not be more 
than %s property value: %d",
-TopicConfig.RETENTION_BYTES_CONFIG, 
retentionBytes);
-throw new 
ConfigException(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, 
localLogRetentionBytes, message);
-}
+String cleanupPolicy = 
props.get(TopicConfig.CLEANUP_POLICY_CONFIG).toString().toLowerCase(Locale.getDefault());
+if (isRemoteStorageEnabled && 
cleanupPolicy.contains(TopicConfig.CLEANUP_POLICY_COMPACT)) {
+throw new ConfigException("Remote log storage is unsupported for 
the compacted topics");
+}

Review Comment:
   could be a separate function - `validateNoRemoteStorageForCompactedTopic`



##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java:
##
@@ -459,49 +463,53 @@ public static void validateValues(Map props) {
 long maxCompactionLag = (Long) 
props.get(TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG);
 if (minCompactionLag > maxCompactionLag) {
 throw new InvalidConfigurationException("conflict topic config 
setting "
-+ TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG + " (" + 
minCompactionLag + ") > "
-+ TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG + " (" + 
maxCompactionLag + ")");
++ TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG + " (" + 
minCompactionLag + ") > "
++ TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG + " (" + 
maxCompactionLag + ")");
 }
+}
 
-if (props.containsKey(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG)) {
-boolean isRemoteStorageEnabled = (Boolean) 
props.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG);
-String cleanupPolicy = 
props.get(TopicConfig.CLEANUP_POLICY_CONFIG).toString().toLowerCase(Locale.getDefault());
-if (isRemoteStorageEnabled && 
cleanupPolicy.contains(TopicConfig.CLEANUP_POLICY_COMPACT)) {
-throw new ConfigException("Remote log storage is unsupported 
for the c