dajac commented on code in PR #19742:
URL: https://github.com/apache/kafka/pull/19742#discussion_r2123076152


##########
core/src/main/scala/kafka/server/ClientQuotaManager.scala:
##########
@@ -451,6 +450,35 @@ class ClientQuotaManager(private val config: 
ClientQuotaManagerConfig,
     }
   }
 
+  /**
+   * Helper method to update quotaTypesEnabled which is a bitwise OR 
combination of the enabled quota types.
+   * For example:
+   *  - If UserQuotaEnabled = 2 and ClientIdQuotaEnabled = 1, then 
quotaTypesEnabled = 3 (2 | 1 = 3)
+   *  - If UserClientIdQuotaEnabled = 4 and UserQuotaEnabled = 1, then 
quotaTypesEnabled =  (4 | 1 = 5)
+   *  - If UserClientIdQuotaEnabled = 4 and ClientIdQuotaEnabled = 2, then 
quotaTypesEnabled = 6 (4 | 2 = 6)
+   *  - If all three are enabled (1 | 2 | 4), then quotaTypesEnabled = 7
+   */
+  private def updateQuotaTypes(): Unit = {
+    quotaTypesEnabled =  if (clientQuotaCallbackPlugin.isDefined) {

Review Comment:
   nit: There is an extra space after `=`.



##########
core/src/main/scala/kafka/server/ClientQuotaManager.scala:
##########
@@ -451,6 +450,35 @@ class ClientQuotaManager(private val config: 
ClientQuotaManagerConfig,
     }
   }
 
+  /**
+   * Helper method to update quotaTypesEnabled which is a bitwise OR 
combination of the enabled quota types.
+   * For example:
+   *  - If UserQuotaEnabled = 2 and ClientIdQuotaEnabled = 1, then 
quotaTypesEnabled = 3 (2 | 1 = 3)
+   *  - If UserClientIdQuotaEnabled = 4 and UserQuotaEnabled = 1, then 
quotaTypesEnabled =  (4 | 1 = 5)
+   *  - If UserClientIdQuotaEnabled = 4 and ClientIdQuotaEnabled = 2, then 
quotaTypesEnabled = 6 (4 | 2 = 6)
+   *  - If all three are enabled (1 | 2 | 4), then quotaTypesEnabled = 7
+   */
+  private def updateQuotaTypes(): Unit = {
+    quotaTypesEnabled =  if (clientQuotaCallbackPlugin.isDefined) {
+        QuotaTypes.CustomQuotas
+      } else {
+        QuotaTypes.NoQuotas
+      }
+
+    activeQuotaEntities.forEach {
+        case KafkaQuotaEntity(Some(_), Some(_)) =>
+          quotaTypesEnabled |= QuotaTypes.UserClientIdQuotaEnabled
+        case KafkaQuotaEntity(Some(_), None) =>
+          quotaTypesEnabled |= QuotaTypes.UserQuotaEnabled
+        case KafkaQuotaEntity(None, Some(_)) =>
+          quotaTypesEnabled |= QuotaTypes.ClientIdQuotaEnabled
+        case _ => // Unexpected entity type
+    }

Review Comment:
   I suppose that `activeQuotaEntities` could be rather large. Does it have any 
performance impact? We are basically moving from a O(1) operation to O(N) here.



##########
core/src/main/scala/kafka/server/ClientQuotaManager.scala:
##########
@@ -451,6 +450,35 @@ class ClientQuotaManager(private val config: 
ClientQuotaManagerConfig,
     }
   }
 
+  /**
+   * Helper method to update quotaTypesEnabled which is a bitwise OR 
combination of the enabled quota types.
+   * For example:
+   *  - If UserQuotaEnabled = 2 and ClientIdQuotaEnabled = 1, then 
quotaTypesEnabled = 3 (2 | 1 = 3)
+   *  - If UserClientIdQuotaEnabled = 4 and UserQuotaEnabled = 1, then 
quotaTypesEnabled =  (4 | 1 = 5)
+   *  - If UserClientIdQuotaEnabled = 4 and ClientIdQuotaEnabled = 2, then 
quotaTypesEnabled = 6 (4 | 2 = 6)
+   *  - If all three are enabled (1 | 2 | 4), then quotaTypesEnabled = 7
+   */
+  private def updateQuotaTypes(): Unit = {
+    quotaTypesEnabled =  if (clientQuotaCallbackPlugin.isDefined) {
+        QuotaTypes.CustomQuotas
+      } else {
+        QuotaTypes.NoQuotas
+      }
+
+    activeQuotaEntities.forEach {
+        case KafkaQuotaEntity(Some(_), Some(_)) =>
+          quotaTypesEnabled |= QuotaTypes.UserClientIdQuotaEnabled
+        case KafkaQuotaEntity(Some(_), None) =>
+          quotaTypesEnabled |= QuotaTypes.UserQuotaEnabled
+        case KafkaQuotaEntity(None, Some(_)) =>
+          quotaTypesEnabled |= QuotaTypes.ClientIdQuotaEnabled
+        case _ => // Unexpected entity type
+    }
+
+    val activeEntities = if (activeQuotaEntities.isEmpty) "No active quota 
entities" else activeQuotaEntities.asScala.map(_.toString).mkString(", ")
+    info(s"Quota types enabled has been changed with active quota entities: 
[$activeEntities]")

Review Comment:
   Generating long string could be costly. Does it bring real value? Is this 
something that we could log in debug?



##########
core/src/main/scala/kafka/server/ClientQuotaManager.scala:
##########
@@ -428,18 +426,19 @@ class ClientQuotaManager(private val config: 
ClientQuotaManagerConfig,
     try {
       val quotaEntity = KafkaQuotaEntity(userEntity, clientEntity)
 
-      if (userEntity.nonEmpty) {
-        if (quotaEntity.clientIdEntity.nonEmpty)
-          quotaTypesEnabled |= QuotaTypes.UserClientIdQuotaEnabled
-        else
-          quotaTypesEnabled |= QuotaTypes.UserQuotaEnabled
-      } else if (clientEntity.nonEmpty)
-        quotaTypesEnabled |= QuotaTypes.ClientIdQuotaEnabled
-
       quota match {
-        case Some(newQuota) => quotaCallback.updateQuota(clientQuotaType, 
quotaEntity, newQuota.bound)
-        case None => quotaCallback.removeQuota(clientQuotaType, quotaEntity)
+        case Some(newQuota) =>
+          quotaCallback.updateQuota(clientQuotaType, quotaEntity, 
newQuota.bound)
+          if(activeQuotaEntities.add(quotaEntity)){

Review Comment:
   nit: A space misses after `if` and another before `{`.



##########
core/src/main/scala/kafka/server/ClientQuotaManager.scala:
##########
@@ -533,6 +561,9 @@ class ClientQuotaManager(private val config: 
ClientQuotaManagerConfig,
     throttledChannelReaper.awaitShutdown()
   }
 
+  // Visible for testing
+  def getQuotaTypesEnabled: Int = quotaTypesEnabled

Review Comment:
   nit: Should we just make `quotaTypesEnabled` package private?



##########
core/src/main/scala/kafka/server/ClientQuotaManager.scala:
##########
@@ -428,18 +426,19 @@ class ClientQuotaManager(private val config: 
ClientQuotaManagerConfig,
     try {
       val quotaEntity = KafkaQuotaEntity(userEntity, clientEntity)
 
-      if (userEntity.nonEmpty) {
-        if (quotaEntity.clientIdEntity.nonEmpty)
-          quotaTypesEnabled |= QuotaTypes.UserClientIdQuotaEnabled
-        else
-          quotaTypesEnabled |= QuotaTypes.UserQuotaEnabled
-      } else if (clientEntity.nonEmpty)
-        quotaTypesEnabled |= QuotaTypes.ClientIdQuotaEnabled
-
       quota match {
-        case Some(newQuota) => quotaCallback.updateQuota(clientQuotaType, 
quotaEntity, newQuota.bound)
-        case None => quotaCallback.removeQuota(clientQuotaType, quotaEntity)
+        case Some(newQuota) =>
+          quotaCallback.updateQuota(clientQuotaType, quotaEntity, 
newQuota.bound)
+          if(activeQuotaEntities.add(quotaEntity)){
+            updateQuotaTypes()
+          }
+        case None =>
+          quotaCallback.removeQuota(clientQuotaType, quotaEntity)
+          if (activeQuotaEntities.remove(quotaEntity)){

Review Comment:
   nit: A space misses before `{`.



##########
core/src/main/scala/kafka/server/ClientQuotaManager.scala:
##########
@@ -451,6 +450,35 @@ class ClientQuotaManager(private val config: 
ClientQuotaManagerConfig,
     }
   }
 
+  /**
+   * Helper method to update quotaTypesEnabled which is a bitwise OR 
combination of the enabled quota types.
+   * For example:
+   *  - If UserQuotaEnabled = 2 and ClientIdQuotaEnabled = 1, then 
quotaTypesEnabled = 3 (2 | 1 = 3)
+   *  - If UserClientIdQuotaEnabled = 4 and UserQuotaEnabled = 1, then 
quotaTypesEnabled =  (4 | 1 = 5)
+   *  - If UserClientIdQuotaEnabled = 4 and ClientIdQuotaEnabled = 2, then 
quotaTypesEnabled = 6 (4 | 2 = 6)
+   *  - If all three are enabled (1 | 2 | 4), then quotaTypesEnabled = 7
+   */
+  private def updateQuotaTypes(): Unit = {
+    quotaTypesEnabled =  if (clientQuotaCallbackPlugin.isDefined) {

Review Comment:
   `quotaTypesEnabled` is a volatile variable. This suggests that we can access 
it anytime. In this method, we update it in a non atomic manner. I wonder if 
this could have any undesired impact. It would be better to use a local 
variable and to update `quotaTypesEnabled` at the end.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to