m1a2st commented on code in PR #19807:
URL: https://github.com/apache/kafka/pull/19807#discussion_r2111445365


##########
core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala:
##########
@@ -242,54 +243,54 @@ class ClientQuotaManagerTest extends 
BaseClientQuotaManagerTest {
 
     try {
       clientQuotaManager.updateQuota(
-        Some(ClientQuotaManager.DefaultUserEntity), 
-        None, 
-        Some(new Quota(1000, true))
+        Optional.of(ClientQuotaManager.DefaultUserEntity.INSTANCE),
+        Optional.empty,
+        Optional.of(new Quota(1000, true))
       )
       clientQuotaManager.updateQuota(
-        None,
-        Some(ClientQuotaManager.DefaultClientIdEntity),
-        Some(new Quota(2000, true))
+        Optional.empty,
+        Optional.of(ClientQuotaManager.DefaultClientIdEntity.INSTANCE),
+        Optional.of(new  Quota(2000, true))

Review Comment:
   ditto
   ```suggestion
           Optional.of(new Quota(2000, true))
   ```



##########
server/src/main/java/org/apache/kafka/server/ControllerMutationQuotaManager.java:
##########
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server;
+
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.internals.Plugin;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.QuotaViolationException;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.metrics.stats.TokenBucket;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.network.Session;
+import org.apache.kafka.server.config.ClientQuotaManagerConfig;
+import org.apache.kafka.server.quota.ClientQuotaCallback;
+import org.apache.kafka.server.quota.QuotaType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Optional;
+
+
+/**
+ * The ControllerMutationQuotaManager is a specialized ClientQuotaManager used 
in the context
+ * of throttling controller's operations/mutations.
+ * config @ClientQuotaManagerConfig quota configs
+ * metrics @Metrics Metrics instance
+ * time @Time object to use
+ * threadNamePrefix The thread prefix to use
+ * quotaCallback @ClientQuotaCallback ClientQuotaCallback to use

Review Comment:
   It’s better to move these@param tags to the constructor. Class-level JavaDoc 
should not include @param tags.



##########
core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala:
##########
@@ -242,54 +243,54 @@ class ClientQuotaManagerTest extends 
BaseClientQuotaManagerTest {
 
     try {
       clientQuotaManager.updateQuota(
-        Some(ClientQuotaManager.DefaultUserEntity), 
-        None, 
-        Some(new Quota(1000, true))
+        Optional.of(ClientQuotaManager.DefaultUserEntity.INSTANCE),
+        Optional.empty,
+        Optional.of(new Quota(1000, true))
       )
       clientQuotaManager.updateQuota(
-        None,
-        Some(ClientQuotaManager.DefaultClientIdEntity),
-        Some(new Quota(2000, true))
+        Optional.empty,
+        Optional.of(ClientQuotaManager.DefaultClientIdEntity.INSTANCE),
+        Optional.of(new  Quota(2000, true))
       )
       clientQuotaManager.updateQuota(
-        Some(ClientQuotaManager.DefaultUserEntity),
-        Some(ClientQuotaManager.DefaultClientIdEntity), 
-        Some(new Quota(3000, true))
+        Optional.of(ClientQuotaManager.DefaultUserEntity.INSTANCE),
+        Optional.of(ClientQuotaManager.DefaultClientIdEntity.INSTANCE),
+        Optional.of(new Quota(3000, true))
       )
       clientQuotaManager.updateQuota(
-        Some(ClientQuotaManager.UserEntity("userA")), 
-        None, 
-        Some(new Quota(4000, true))
+        Optional.of(new ClientQuotaManager.UserEntity("userA")),
+        Optional.empty,
+        Optional.of(new  Quota(4000, true))
       )
       clientQuotaManager.updateQuota(
-        Some(ClientQuotaManager.UserEntity("userA")), 
-        Some(ClientQuotaManager.ClientIdEntity("client1")),
-        Some(new Quota(5000, true))
+        Optional.of(new ClientQuotaManager.UserEntity("userA")),
+        Optional.of(new ClientQuotaManager.ClientIdEntity("client1")),
+        Optional.of(new Quota(5000, true))
       )
       clientQuotaManager.updateQuota(
-        Some(ClientQuotaManager.UserEntity("userB")), 
-        None, 
-        Some(new Quota(6000, true))
+        Optional.of(new ClientQuotaManager.UserEntity("userB")),
+        Optional.empty,
+        Optional.of(new Quota(6000, true))
       )
       clientQuotaManager.updateQuota(
-        Some(ClientQuotaManager.UserEntity("userB")),
-        Some(ClientQuotaManager.ClientIdEntity("client1")),
-        Some(new Quota(7000, true))
+        Optional.of(new ClientQuotaManager.UserEntity("userB")),
+        Optional.of(new ClientQuotaManager.ClientIdEntity("client1")),
+        Optional.of(new Quota(7000, true))
       )
       clientQuotaManager.updateQuota(
-        Some(ClientQuotaManager.UserEntity("userB")),
-        Some(ClientQuotaManager.DefaultClientIdEntity), 
-        Some(new Quota(8000, true))
+        Optional.of(new ClientQuotaManager.UserEntity("userB")),
+        Optional.of(ClientQuotaManager.DefaultClientIdEntity.INSTANCE),
+        Optional.of(new Quota(8000, true))
       )
       clientQuotaManager.updateQuota(
-        Some(ClientQuotaManager.UserEntity("userC")), 
-        None, 
-        Some(new Quota(10000, true))
+        Optional.of(new ClientQuotaManager.UserEntity("userC")),
+        Optional.empty,
+        Optional.of(new Quota(10000, true))
       )
       clientQuotaManager.updateQuota(
-        None,
-        Some(ClientQuotaManager.ClientIdEntity("client1")),
-        Some(new Quota(9000, true))
+        Optional.empty,
+        Optional.of(new ClientQuotaManager.ClientIdEntity("client1")),
+        Optional.of(new  Quota(9000, true))

Review Comment:
   ditto
   ```suggestion
           Optional.of(new Quota(9000, true))
   ```



##########
core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala:
##########
@@ -329,40 +330,40 @@ class ClientQuotaManagerTest extends 
BaseClientQuotaManagerTest {
 
       // Update quotas
       clientQuotaManager.updateQuota(
-        Some(ClientQuotaManager.UserEntity("userA")), 
-        None, 
-        Some(new Quota(8000, true))
+        Optional.of(new ClientQuotaManager.UserEntity("userA")),
+        Optional.empty,
+        Optional.of(new Quota(8000, true))
       )
       clientQuotaManager.updateQuota(
-        Some(ClientQuotaManager.UserEntity("userA")),
-        Some(ClientQuotaManager.ClientIdEntity("client1")),
-        Some(new Quota(10000, true))
+        Optional.of(new ClientQuotaManager.UserEntity("userA")),
+        Optional.of(new ClientQuotaManager.ClientIdEntity("client1")),
+        Optional.of(new  Quota(10000, true))
       )
       checkQuota(clientQuotaManager, "userA", "client2", 8000, 0, 
expectThrottle = false)
       checkQuota(clientQuotaManager, "userA", "client2", 8000, 4500, 
expectThrottle = true) // Throttled due to sum of new and earlier values
       checkQuota(clientQuotaManager, "userA", "client1", 10000, 0, 
expectThrottle = false)
       checkQuota(clientQuotaManager, "userA", "client1", 10000, 6000, 
expectThrottle = true)
       clientQuotaManager.updateQuota(
-        Some(ClientQuotaManager.UserEntity("userA")),
-        Some(ClientQuotaManager.ClientIdEntity("client1")),
-        None
+        Optional.of(new ClientQuotaManager.UserEntity("userA")),
+        Optional.of(new ClientQuotaManager.ClientIdEntity("client1")),
+        Optional.empty
       )
       checkQuota(clientQuotaManager, "userA", "client6", 8000, 0, 
expectThrottle = true)    // Throttled due to shared user quota
       clientQuotaManager.updateQuota(
-        Some(ClientQuotaManager.UserEntity("userA")),
-        Some(ClientQuotaManager.ClientIdEntity("client6")),
-        Some(new Quota(11000, true))
+        Optional.of(new ClientQuotaManager.UserEntity("userA")),
+        Optional.of(new ClientQuotaManager.ClientIdEntity("client6")),
+        Optional.of(new  Quota(11000, true))

Review Comment:
   ditto
   ```suggestion
           Optional.of(new Quota(11000, true))
   ```



##########
server/src/main/java/org/apache/kafka/server/AbstractControllerMutationQuota.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server;
+
+import org.apache.kafka.common.metrics.QuotaViolationException;
+import org.apache.kafka.common.utils.Time;
+
+/**
+ * The AbstractControllerMutationQuota is the base class of 
StrictControllerMutationQuota and
+ * PermissiveControllerMutationQuota.
+ * time @Time object to use

Review Comment:
   ditto



##########
core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala:
##########
@@ -190,17 +191,17 @@ class ClientQuotaManagerTest extends 
BaseClientQuotaManagerTest {
     try {
       // Set <user> quota config
       clientQuotaManager.updateQuota(
-        Some(ClientQuotaManager.UserEntity("userA")), 
-        None, 
-        Some(new Quota(10, true))
+        Optional.of(new ClientQuotaManager.UserEntity("userA")),
+        Optional.empty,
+        Optional.of(new  Quota(10, true))

Review Comment:
   ditto
   ```suggestion
           Optional.of(new Quota(10, true))
   ```



##########
core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala:
##########
@@ -84,12 +85,12 @@ class ClientQuotaManagerTest extends 
BaseClientQuotaManagerTest {
       clientQuotaManager.updateQuota(
         client1.configUser,
         client1.configClientEntity,
-        None
+        Optional.empty
       )
       clientQuotaManager.updateQuota(
         defaultConfigClient.configUser,
         defaultConfigClient.configClientEntity,
-        Some(new Quota(4000, true))
+        Optional.of(new  Quota(4000, true))

Review Comment:
   There's an extra space here.
   ```suggestion
           Optional.of(new Quota(4000, true))
   ```



##########
core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala:
##########
@@ -141,9 +142,9 @@ class ClientQuotaManagerTest extends 
BaseClientQuotaManagerTest {
 
       // Set default <user> quota config
       clientQuotaManager.updateQuota(
-        Some(ClientQuotaManager.DefaultUserEntity), 
-        None, 
-        Some(new Quota(10, true))
+        Optional.of(ClientQuotaManager.DefaultUserEntity.INSTANCE),
+        Optional.empty,
+        Optional.of(new  Quota(10, true))

Review Comment:
   ditto
   ```suggestion
           Optional.of(new Quota(10, true))
   ```



##########
core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala:
##########
@@ -329,40 +330,40 @@ class ClientQuotaManagerTest extends 
BaseClientQuotaManagerTest {
 
       // Update quotas
       clientQuotaManager.updateQuota(
-        Some(ClientQuotaManager.UserEntity("userA")), 
-        None, 
-        Some(new Quota(8000, true))
+        Optional.of(new ClientQuotaManager.UserEntity("userA")),
+        Optional.empty,
+        Optional.of(new Quota(8000, true))
       )
       clientQuotaManager.updateQuota(
-        Some(ClientQuotaManager.UserEntity("userA")),
-        Some(ClientQuotaManager.ClientIdEntity("client1")),
-        Some(new Quota(10000, true))
+        Optional.of(new ClientQuotaManager.UserEntity("userA")),
+        Optional.of(new ClientQuotaManager.ClientIdEntity("client1")),
+        Optional.of(new  Quota(10000, true))

Review Comment:
   ditto
   ```suggestion
           Optional.of(new Quota(10000, true))
   ```



##########
core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala:
##########
@@ -242,54 +243,54 @@ class ClientQuotaManagerTest extends 
BaseClientQuotaManagerTest {
 
     try {
       clientQuotaManager.updateQuota(
-        Some(ClientQuotaManager.DefaultUserEntity), 
-        None, 
-        Some(new Quota(1000, true))
+        Optional.of(ClientQuotaManager.DefaultUserEntity.INSTANCE),
+        Optional.empty,
+        Optional.of(new Quota(1000, true))
       )
       clientQuotaManager.updateQuota(
-        None,
-        Some(ClientQuotaManager.DefaultClientIdEntity),
-        Some(new Quota(2000, true))
+        Optional.empty,
+        Optional.of(ClientQuotaManager.DefaultClientIdEntity.INSTANCE),
+        Optional.of(new  Quota(2000, true))
       )
       clientQuotaManager.updateQuota(
-        Some(ClientQuotaManager.DefaultUserEntity),
-        Some(ClientQuotaManager.DefaultClientIdEntity), 
-        Some(new Quota(3000, true))
+        Optional.of(ClientQuotaManager.DefaultUserEntity.INSTANCE),
+        Optional.of(ClientQuotaManager.DefaultClientIdEntity.INSTANCE),
+        Optional.of(new Quota(3000, true))
       )
       clientQuotaManager.updateQuota(
-        Some(ClientQuotaManager.UserEntity("userA")), 
-        None, 
-        Some(new Quota(4000, true))
+        Optional.of(new ClientQuotaManager.UserEntity("userA")),
+        Optional.empty,
+        Optional.of(new  Quota(4000, true))

Review Comment:
   ditto
   ```suggestion
           Optional.of(new Quota(4000, true))
   ```



##########
server/src/main/java/org/apache/kafka/server/ClientSensors.java:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server;
+
+import org.apache.kafka.common.metrics.Sensor;
+
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Represents the sensors aggregated per client
+ * metricTags Quota metric tags for the client
+ * quotaSensor @Sensor that tracks the quota
+ * throttleTimeSensor @Sensor that tracks the throttle time
+ */
+public final class ClientSensors {
+    private final Map<String, String> metricTags;
+    private final Sensor quotaSensor;
+    private final Sensor throttleTimeSensor;
+
+    public ClientSensors(Map<String, String> metricTags,
+                         Sensor quotaSensor,
+                         Sensor throttleTimeSensor) {
+        this.metricTags = Map.copyOf(metricTags); // Defensive immutable copy
+        this.quotaSensor = Objects.requireNonNull(quotaSensor);
+        this.throttleTimeSensor = Objects.requireNonNull(throttleTimeSensor);
+    }
+
+    // Getters (equivalent to case class field access)
+    public Map<String, String> metricTags() {
+        return metricTags;
+    }
+
+    public Sensor quotaSensor() {
+        return quotaSensor;
+    }
+
+    public Sensor throttleTimeSensor() {
+        return throttleTimeSensor;
+    }
+
+    // equals, hashCode, toString (equivalent to case class generated methods)
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) return true;
+        if (obj == null || getClass() != obj.getClass()) return false;
+        ClientSensors that = (ClientSensors) obj;
+        return Objects.equals(metricTags, that.metricTags) &&
+                Objects.equals(quotaSensor, that.quotaSensor) &&
+                Objects.equals(throttleTimeSensor, that.throttleTimeSensor);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(metricTags, quotaSensor, throttleTimeSensor);
+    }
+
+    @Override
+    public String toString() {
+        return "ClientSensors{" +
+                "metricTags=" + metricTags +
+                ", quotaSensor=" + quotaSensor +
+                ", throttleTimeSensor=" + throttleTimeSensor +
+                '}';
+    }

Review Comment:
   Why do we need this comment? If these methods are equivalent to the 
generated ones, why do we need to override them?



##########
core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala:
##########
@@ -453,9 +454,9 @@ class ClientQuotaManagerTest extends 
BaseClientQuotaManagerTest {
     val clientQuotaManager = new ClientQuotaManager(config, metrics, 
QuotaType.PRODUCE, time, "")
     try {
       clientQuotaManager.updateQuota(
-        None,
-        Some(ClientQuotaManager.DefaultClientIdEntity),
-        Some(new Quota(500, true))
+        Optional.empty,
+        Optional.of(ClientQuotaManager.DefaultClientIdEntity.INSTANCE),
+        Optional.of(new  Quota(500, true))

Review Comment:
   ditto
   ```suggestion
           Optional.of(new Quota(500, true))
   ```



##########
core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala:
##########
@@ -217,17 +218,17 @@ class ClientQuotaManagerTest extends 
BaseClientQuotaManagerTest {
     try {
       // Set <user, client-id> quota config
       clientQuotaManager.updateQuota(
-        Some(ClientQuotaManager.UserEntity("userA")),
-        Some(ClientQuotaManager.ClientIdEntity("client1")), 
-        Some(new Quota(10, true))
+        Optional.of(new ClientQuotaManager.UserEntity("userA")),
+        Optional.of(new ClientQuotaManager.ClientIdEntity("client1")),
+        Optional.of(new  Quota(10, true))

Review Comment:
   ditto
   ```suggestion
           Optional.of(new Quota(10, true))
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to