dajac commented on a change in pull request #8933:
URL: https://github.com/apache/kafka/pull/8933#discussion_r456307126



##########
File path: 
core/src/test/scala/unit/kafka/server/ControllerMutationQuotaTest.scala
##########
@@ -0,0 +1,361 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package kafka.server
+
+import java.util.Properties
+import java.util.concurrent.ExecutionException
+import java.util.concurrent.TimeUnit
+
+import kafka.utils.TestUtils
+import org.apache.kafka.common.internals.KafkaFutureImpl
+import org.apache.kafka.common.message.CreatePartitionsRequestData
+import 
org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsTopic
+import org.apache.kafka.common.message.CreateTopicsRequestData
+import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
+import org.apache.kafka.common.message.DeleteTopicsRequestData
+import org.apache.kafka.common.protocol.ApiKeys
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.quota.ClientQuotaAlteration
+import org.apache.kafka.common.quota.ClientQuotaEntity
+import org.apache.kafka.common.requests.AlterClientQuotasRequest
+import org.apache.kafka.common.requests.AlterClientQuotasResponse
+import org.apache.kafka.common.requests.CreatePartitionsRequest
+import org.apache.kafka.common.requests.CreatePartitionsResponse
+import org.apache.kafka.common.requests.CreateTopicsRequest
+import org.apache.kafka.common.requests.CreateTopicsResponse
+import org.apache.kafka.common.requests.DeleteTopicsRequest
+import org.apache.kafka.common.requests.DeleteTopicsResponse
+import org.apache.kafka.common.security.auth.AuthenticationContext
+import org.apache.kafka.common.security.auth.KafkaPrincipal
+import org.apache.kafka.common.security.auth.KafkaPrincipalBuilder
+import org.junit.Assert.assertEquals
+import org.junit.Assert.assertTrue
+import org.junit.Before
+import org.junit.Test
+
+import scala.jdk.CollectionConverters._
+
+object ControllerMutationQuotaTest {
+  // Principal used for all client connections. This is updated by each test.
+  var principal = KafkaPrincipal.ANONYMOUS
+  class TestPrincipalBuilder extends KafkaPrincipalBuilder {
+    override def build(context: AuthenticationContext): KafkaPrincipal = {
+      principal
+    }
+  }
+
+  def asPrincipal(newPrincipal: KafkaPrincipal)(f: => Unit): Unit = {
+    val currentPrincipal = principal
+    principal = newPrincipal
+    try f
+    finally principal = currentPrincipal
+  }
+
+  val ThrottledPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, 
"ThrottledPrincipal")
+  val UnboundedPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, 
"UnboundedPrincipal")
+
+  val StrictCreateTopicsRequestVersion = ApiKeys.CREATE_TOPICS.latestVersion
+  val PermissiveCreateTopicsRequestVersion = 5.toShort
+
+  val StrictDeleteTopicsRequestVersion = ApiKeys.DELETE_TOPICS.latestVersion
+  val PermissiveDeleteTopicsRequestVersion = 4.toShort
+
+  val StrictCreatePartitionsRequestVersion = 
ApiKeys.CREATE_PARTITIONS.latestVersion
+  val PermissiveCreatePartitionsRequestVersion = 2.toShort
+
+  val TopicsWithOnePartition = Seq("topic-1" ->  1, "topic-2" ->  1)
+  val TopicsWith30Partitions = Seq("topic-1" -> 30, "topic-2" -> 30)
+  val TopicsWith31Partitions = Seq("topic-1" -> 31, "topic-2" -> 31)
+
+  val ControllerMutationRate = 2.0
+}
+
+class ControllerMutationQuotaTest extends BaseRequestTest {
+  import ControllerMutationQuotaTest._
+
+  override def brokerCount: Int = 1
+
+  override def brokerPropertyOverrides(properties: Properties): Unit = {
+    properties.put(KafkaConfig.ControlledShutdownEnableProp, "false")
+    properties.put(KafkaConfig.OffsetsTopicReplicationFactorProp, "1")
+    properties.put(KafkaConfig.OffsetsTopicPartitionsProp, "1")
+    properties.put(KafkaConfig.PrincipalBuilderClassProp,
+      classOf[ControllerMutationQuotaTest.TestPrincipalBuilder].getName)
+    // We use the default number of samples and window size.
+    properties.put(KafkaConfig.NumControllerQuotaSamplesProp, "11")
+    properties.put(KafkaConfig.ControllerQuotaWindowSizeSecondsProp, "1")
+  }
+
+  @Before
+  override def setUp(): Unit = {
+    super.setUp()
+
+    // Define a quota for ThrottledPrincipal
+    defineUserQuota(ThrottledPrincipal.getName, Some(ControllerMutationRate))
+    waitUserQuota(ThrottledPrincipal.getName, ControllerMutationRate)
+  }
+
+  @Test
+  def testSetUnsetQuota(): Unit = {
+    val principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "User")
+    // Default Value
+    waitUserQuota(principal.getName, Long.MaxValue)
+    // Define a new quota
+    defineUserQuota(principal.getName, Some(ControllerMutationRate))
+    // Check it
+    waitUserQuota(principal.getName, ControllerMutationRate)
+    // Remove it
+    defineUserQuota(principal.getName, None)
+    // Back to the default
+    waitUserQuota(principal.getName, Long.MaxValue)
+  }
+
+  @Test
+  def testStrictCreateTopicsRequest(): Unit = {
+    asPrincipal(ThrottledPrincipal) {
+      // Create two topics worth of 30 partitions each. As we use a strict 
quota, we
+      // expect the first topic to be created and the second to be rejected.
+      // Theoretically, the throttle time should be bellow or equal to:
+      // ((30 / 10) - 2) / 2 * 10 = 5s
+      val (throttleTimeMs1, errors1) = createTopics(TopicsWith30Partitions, 
StrictCreateTopicsRequestVersion)
+      assertTrue((5000 - throttleTimeMs1) < 1000)
+      assertEquals(Seq(Errors.NONE, Errors.THROTTLING_QUOTA_EXCEEDED), errors1)
+
+      // The implementation of the Rate has NOT been changed yet so we have to 
wait past
+      // the window in order to get the avg rate bellow the quota.
+      Thread.sleep(11000) // Thread.sleep(throttleTimeMs1)

Review comment:
       I do agree that this is pretty annoying. I am a bit reluctant to using 
mock time here as these are real integration tests so I would like to test the 
real conditions. When we will have the improvement for the quota, the sleep 
time will be <5s and we have 3 of them in the whole test suite. IMO, 15s is not 
that bad. We do similar in the other quota integration tests for example.
   
   I may be able to reduce it a bit by tweaking the quota used. Let me try.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to