m1a2st commented on code in PR #19807: URL: https://github.com/apache/kafka/pull/19807#discussion_r2111445365
########## core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala: ########## @@ -242,54 +243,54 @@ class ClientQuotaManagerTest extends BaseClientQuotaManagerTest { try { clientQuotaManager.updateQuota( - Some(ClientQuotaManager.DefaultUserEntity), - None, - Some(new Quota(1000, true)) + Optional.of(ClientQuotaManager.DefaultUserEntity.INSTANCE), + Optional.empty, + Optional.of(new Quota(1000, true)) ) clientQuotaManager.updateQuota( - None, - Some(ClientQuotaManager.DefaultClientIdEntity), - Some(new Quota(2000, true)) + Optional.empty, + Optional.of(ClientQuotaManager.DefaultClientIdEntity.INSTANCE), + Optional.of(new Quota(2000, true)) Review Comment: ditto ```suggestion Optional.of(new Quota(2000, true)) ``` ########## server/src/main/java/org/apache/kafka/server/ControllerMutationQuotaManager.java: ########## @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server; + +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.internals.Plugin; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.QuotaViolationException; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.stats.Rate; +import org.apache.kafka.common.metrics.stats.TokenBucket; +import org.apache.kafka.common.requests.RequestHeader; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.network.Session; +import org.apache.kafka.server.config.ClientQuotaManagerConfig; +import org.apache.kafka.server.quota.ClientQuotaCallback; +import org.apache.kafka.server.quota.QuotaType; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.Optional; + + +/** + * The ControllerMutationQuotaManager is a specialized ClientQuotaManager used in the context + * of throttling controller's operations/mutations. + * config @ClientQuotaManagerConfig quota configs + * metrics @Metrics Metrics instance + * time @Time object to use + * threadNamePrefix The thread prefix to use + * quotaCallback @ClientQuotaCallback ClientQuotaCallback to use Review Comment: It’s better to move these@param tags to the constructor. Class-level JavaDoc should not include @param tags. ########## core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala: ########## @@ -242,54 +243,54 @@ class ClientQuotaManagerTest extends BaseClientQuotaManagerTest { try { clientQuotaManager.updateQuota( - Some(ClientQuotaManager.DefaultUserEntity), - None, - Some(new Quota(1000, true)) + Optional.of(ClientQuotaManager.DefaultUserEntity.INSTANCE), + Optional.empty, + Optional.of(new Quota(1000, true)) ) clientQuotaManager.updateQuota( - None, - Some(ClientQuotaManager.DefaultClientIdEntity), - Some(new Quota(2000, true)) + Optional.empty, + Optional.of(ClientQuotaManager.DefaultClientIdEntity.INSTANCE), + Optional.of(new Quota(2000, true)) ) clientQuotaManager.updateQuota( - Some(ClientQuotaManager.DefaultUserEntity), - Some(ClientQuotaManager.DefaultClientIdEntity), - Some(new Quota(3000, true)) + Optional.of(ClientQuotaManager.DefaultUserEntity.INSTANCE), + Optional.of(ClientQuotaManager.DefaultClientIdEntity.INSTANCE), + Optional.of(new Quota(3000, true)) ) clientQuotaManager.updateQuota( - Some(ClientQuotaManager.UserEntity("userA")), - None, - Some(new Quota(4000, true)) + Optional.of(new ClientQuotaManager.UserEntity("userA")), + Optional.empty, + Optional.of(new Quota(4000, true)) ) clientQuotaManager.updateQuota( - Some(ClientQuotaManager.UserEntity("userA")), - Some(ClientQuotaManager.ClientIdEntity("client1")), - Some(new Quota(5000, true)) + Optional.of(new ClientQuotaManager.UserEntity("userA")), + Optional.of(new ClientQuotaManager.ClientIdEntity("client1")), + Optional.of(new Quota(5000, true)) ) clientQuotaManager.updateQuota( - Some(ClientQuotaManager.UserEntity("userB")), - None, - Some(new Quota(6000, true)) + Optional.of(new ClientQuotaManager.UserEntity("userB")), + Optional.empty, + Optional.of(new Quota(6000, true)) ) clientQuotaManager.updateQuota( - Some(ClientQuotaManager.UserEntity("userB")), - Some(ClientQuotaManager.ClientIdEntity("client1")), - Some(new Quota(7000, true)) + Optional.of(new ClientQuotaManager.UserEntity("userB")), + Optional.of(new ClientQuotaManager.ClientIdEntity("client1")), + Optional.of(new Quota(7000, true)) ) clientQuotaManager.updateQuota( - Some(ClientQuotaManager.UserEntity("userB")), - Some(ClientQuotaManager.DefaultClientIdEntity), - Some(new Quota(8000, true)) + Optional.of(new ClientQuotaManager.UserEntity("userB")), + Optional.of(ClientQuotaManager.DefaultClientIdEntity.INSTANCE), + Optional.of(new Quota(8000, true)) ) clientQuotaManager.updateQuota( - Some(ClientQuotaManager.UserEntity("userC")), - None, - Some(new Quota(10000, true)) + Optional.of(new ClientQuotaManager.UserEntity("userC")), + Optional.empty, + Optional.of(new Quota(10000, true)) ) clientQuotaManager.updateQuota( - None, - Some(ClientQuotaManager.ClientIdEntity("client1")), - Some(new Quota(9000, true)) + Optional.empty, + Optional.of(new ClientQuotaManager.ClientIdEntity("client1")), + Optional.of(new Quota(9000, true)) Review Comment: ditto ```suggestion Optional.of(new Quota(9000, true)) ``` ########## core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala: ########## @@ -329,40 +330,40 @@ class ClientQuotaManagerTest extends BaseClientQuotaManagerTest { // Update quotas clientQuotaManager.updateQuota( - Some(ClientQuotaManager.UserEntity("userA")), - None, - Some(new Quota(8000, true)) + Optional.of(new ClientQuotaManager.UserEntity("userA")), + Optional.empty, + Optional.of(new Quota(8000, true)) ) clientQuotaManager.updateQuota( - Some(ClientQuotaManager.UserEntity("userA")), - Some(ClientQuotaManager.ClientIdEntity("client1")), - Some(new Quota(10000, true)) + Optional.of(new ClientQuotaManager.UserEntity("userA")), + Optional.of(new ClientQuotaManager.ClientIdEntity("client1")), + Optional.of(new Quota(10000, true)) ) checkQuota(clientQuotaManager, "userA", "client2", 8000, 0, expectThrottle = false) checkQuota(clientQuotaManager, "userA", "client2", 8000, 4500, expectThrottle = true) // Throttled due to sum of new and earlier values checkQuota(clientQuotaManager, "userA", "client1", 10000, 0, expectThrottle = false) checkQuota(clientQuotaManager, "userA", "client1", 10000, 6000, expectThrottle = true) clientQuotaManager.updateQuota( - Some(ClientQuotaManager.UserEntity("userA")), - Some(ClientQuotaManager.ClientIdEntity("client1")), - None + Optional.of(new ClientQuotaManager.UserEntity("userA")), + Optional.of(new ClientQuotaManager.ClientIdEntity("client1")), + Optional.empty ) checkQuota(clientQuotaManager, "userA", "client6", 8000, 0, expectThrottle = true) // Throttled due to shared user quota clientQuotaManager.updateQuota( - Some(ClientQuotaManager.UserEntity("userA")), - Some(ClientQuotaManager.ClientIdEntity("client6")), - Some(new Quota(11000, true)) + Optional.of(new ClientQuotaManager.UserEntity("userA")), + Optional.of(new ClientQuotaManager.ClientIdEntity("client6")), + Optional.of(new Quota(11000, true)) Review Comment: ditto ```suggestion Optional.of(new Quota(11000, true)) ``` ########## server/src/main/java/org/apache/kafka/server/AbstractControllerMutationQuota.java: ########## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server; + +import org.apache.kafka.common.metrics.QuotaViolationException; +import org.apache.kafka.common.utils.Time; + +/** + * The AbstractControllerMutationQuota is the base class of StrictControllerMutationQuota and + * PermissiveControllerMutationQuota. + * time @Time object to use Review Comment: ditto ########## core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala: ########## @@ -190,17 +191,17 @@ class ClientQuotaManagerTest extends BaseClientQuotaManagerTest { try { // Set <user> quota config clientQuotaManager.updateQuota( - Some(ClientQuotaManager.UserEntity("userA")), - None, - Some(new Quota(10, true)) + Optional.of(new ClientQuotaManager.UserEntity("userA")), + Optional.empty, + Optional.of(new Quota(10, true)) Review Comment: ditto ```suggestion Optional.of(new Quota(10, true)) ``` ########## core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala: ########## @@ -84,12 +85,12 @@ class ClientQuotaManagerTest extends BaseClientQuotaManagerTest { clientQuotaManager.updateQuota( client1.configUser, client1.configClientEntity, - None + Optional.empty ) clientQuotaManager.updateQuota( defaultConfigClient.configUser, defaultConfigClient.configClientEntity, - Some(new Quota(4000, true)) + Optional.of(new Quota(4000, true)) Review Comment: There's an extra space here. ```suggestion Optional.of(new Quota(4000, true)) ``` ########## core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala: ########## @@ -141,9 +142,9 @@ class ClientQuotaManagerTest extends BaseClientQuotaManagerTest { // Set default <user> quota config clientQuotaManager.updateQuota( - Some(ClientQuotaManager.DefaultUserEntity), - None, - Some(new Quota(10, true)) + Optional.of(ClientQuotaManager.DefaultUserEntity.INSTANCE), + Optional.empty, + Optional.of(new Quota(10, true)) Review Comment: ditto ```suggestion Optional.of(new Quota(10, true)) ``` ########## core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala: ########## @@ -329,40 +330,40 @@ class ClientQuotaManagerTest extends BaseClientQuotaManagerTest { // Update quotas clientQuotaManager.updateQuota( - Some(ClientQuotaManager.UserEntity("userA")), - None, - Some(new Quota(8000, true)) + Optional.of(new ClientQuotaManager.UserEntity("userA")), + Optional.empty, + Optional.of(new Quota(8000, true)) ) clientQuotaManager.updateQuota( - Some(ClientQuotaManager.UserEntity("userA")), - Some(ClientQuotaManager.ClientIdEntity("client1")), - Some(new Quota(10000, true)) + Optional.of(new ClientQuotaManager.UserEntity("userA")), + Optional.of(new ClientQuotaManager.ClientIdEntity("client1")), + Optional.of(new Quota(10000, true)) Review Comment: ditto ```suggestion Optional.of(new Quota(10000, true)) ``` ########## core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala: ########## @@ -242,54 +243,54 @@ class ClientQuotaManagerTest extends BaseClientQuotaManagerTest { try { clientQuotaManager.updateQuota( - Some(ClientQuotaManager.DefaultUserEntity), - None, - Some(new Quota(1000, true)) + Optional.of(ClientQuotaManager.DefaultUserEntity.INSTANCE), + Optional.empty, + Optional.of(new Quota(1000, true)) ) clientQuotaManager.updateQuota( - None, - Some(ClientQuotaManager.DefaultClientIdEntity), - Some(new Quota(2000, true)) + Optional.empty, + Optional.of(ClientQuotaManager.DefaultClientIdEntity.INSTANCE), + Optional.of(new Quota(2000, true)) ) clientQuotaManager.updateQuota( - Some(ClientQuotaManager.DefaultUserEntity), - Some(ClientQuotaManager.DefaultClientIdEntity), - Some(new Quota(3000, true)) + Optional.of(ClientQuotaManager.DefaultUserEntity.INSTANCE), + Optional.of(ClientQuotaManager.DefaultClientIdEntity.INSTANCE), + Optional.of(new Quota(3000, true)) ) clientQuotaManager.updateQuota( - Some(ClientQuotaManager.UserEntity("userA")), - None, - Some(new Quota(4000, true)) + Optional.of(new ClientQuotaManager.UserEntity("userA")), + Optional.empty, + Optional.of(new Quota(4000, true)) Review Comment: ditto ```suggestion Optional.of(new Quota(4000, true)) ``` ########## server/src/main/java/org/apache/kafka/server/ClientSensors.java: ########## @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server; + +import org.apache.kafka.common.metrics.Sensor; + +import java.util.Map; +import java.util.Objects; + +/** + * Represents the sensors aggregated per client + * metricTags Quota metric tags for the client + * quotaSensor @Sensor that tracks the quota + * throttleTimeSensor @Sensor that tracks the throttle time + */ +public final class ClientSensors { + private final Map<String, String> metricTags; + private final Sensor quotaSensor; + private final Sensor throttleTimeSensor; + + public ClientSensors(Map<String, String> metricTags, + Sensor quotaSensor, + Sensor throttleTimeSensor) { + this.metricTags = Map.copyOf(metricTags); // Defensive immutable copy + this.quotaSensor = Objects.requireNonNull(quotaSensor); + this.throttleTimeSensor = Objects.requireNonNull(throttleTimeSensor); + } + + // Getters (equivalent to case class field access) + public Map<String, String> metricTags() { + return metricTags; + } + + public Sensor quotaSensor() { + return quotaSensor; + } + + public Sensor throttleTimeSensor() { + return throttleTimeSensor; + } + + // equals, hashCode, toString (equivalent to case class generated methods) + @Override + public boolean equals(Object obj) { + if (this == obj) return true; + if (obj == null || getClass() != obj.getClass()) return false; + ClientSensors that = (ClientSensors) obj; + return Objects.equals(metricTags, that.metricTags) && + Objects.equals(quotaSensor, that.quotaSensor) && + Objects.equals(throttleTimeSensor, that.throttleTimeSensor); + } + + @Override + public int hashCode() { + return Objects.hash(metricTags, quotaSensor, throttleTimeSensor); + } + + @Override + public String toString() { + return "ClientSensors{" + + "metricTags=" + metricTags + + ", quotaSensor=" + quotaSensor + + ", throttleTimeSensor=" + throttleTimeSensor + + '}'; + } Review Comment: Why do we need this comment? If these methods are equivalent to the generated ones, why do we need to override them? ########## core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala: ########## @@ -453,9 +454,9 @@ class ClientQuotaManagerTest extends BaseClientQuotaManagerTest { val clientQuotaManager = new ClientQuotaManager(config, metrics, QuotaType.PRODUCE, time, "") try { clientQuotaManager.updateQuota( - None, - Some(ClientQuotaManager.DefaultClientIdEntity), - Some(new Quota(500, true)) + Optional.empty, + Optional.of(ClientQuotaManager.DefaultClientIdEntity.INSTANCE), + Optional.of(new Quota(500, true)) Review Comment: ditto ```suggestion Optional.of(new Quota(500, true)) ``` ########## core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala: ########## @@ -217,17 +218,17 @@ class ClientQuotaManagerTest extends BaseClientQuotaManagerTest { try { // Set <user, client-id> quota config clientQuotaManager.updateQuota( - Some(ClientQuotaManager.UserEntity("userA")), - Some(ClientQuotaManager.ClientIdEntity("client1")), - Some(new Quota(10, true)) + Optional.of(new ClientQuotaManager.UserEntity("userA")), + Optional.of(new ClientQuotaManager.ClientIdEntity("client1")), + Optional.of(new Quota(10, true)) Review Comment: ditto ```suggestion Optional.of(new Quota(10, true)) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org