dajac commented on code in PR #19742: URL: https://github.com/apache/kafka/pull/19742#discussion_r2123076152
########## core/src/main/scala/kafka/server/ClientQuotaManager.scala: ########## @@ -451,6 +450,35 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, } } + /** + * Helper method to update quotaTypesEnabled which is a bitwise OR combination of the enabled quota types. + * For example: + * - If UserQuotaEnabled = 2 and ClientIdQuotaEnabled = 1, then quotaTypesEnabled = 3 (2 | 1 = 3) + * - If UserClientIdQuotaEnabled = 4 and UserQuotaEnabled = 1, then quotaTypesEnabled = (4 | 1 = 5) + * - If UserClientIdQuotaEnabled = 4 and ClientIdQuotaEnabled = 2, then quotaTypesEnabled = 6 (4 | 2 = 6) + * - If all three are enabled (1 | 2 | 4), then quotaTypesEnabled = 7 + */ + private def updateQuotaTypes(): Unit = { + quotaTypesEnabled = if (clientQuotaCallbackPlugin.isDefined) { Review Comment: nit: There is an extra space after `=`. ########## core/src/main/scala/kafka/server/ClientQuotaManager.scala: ########## @@ -451,6 +450,35 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, } } + /** + * Helper method to update quotaTypesEnabled which is a bitwise OR combination of the enabled quota types. + * For example: + * - If UserQuotaEnabled = 2 and ClientIdQuotaEnabled = 1, then quotaTypesEnabled = 3 (2 | 1 = 3) + * - If UserClientIdQuotaEnabled = 4 and UserQuotaEnabled = 1, then quotaTypesEnabled = (4 | 1 = 5) + * - If UserClientIdQuotaEnabled = 4 and ClientIdQuotaEnabled = 2, then quotaTypesEnabled = 6 (4 | 2 = 6) + * - If all three are enabled (1 | 2 | 4), then quotaTypesEnabled = 7 + */ + private def updateQuotaTypes(): Unit = { + quotaTypesEnabled = if (clientQuotaCallbackPlugin.isDefined) { + QuotaTypes.CustomQuotas + } else { + QuotaTypes.NoQuotas + } + + activeQuotaEntities.forEach { + case KafkaQuotaEntity(Some(_), Some(_)) => + quotaTypesEnabled |= QuotaTypes.UserClientIdQuotaEnabled + case KafkaQuotaEntity(Some(_), None) => + quotaTypesEnabled |= QuotaTypes.UserQuotaEnabled + case KafkaQuotaEntity(None, Some(_)) => + quotaTypesEnabled |= QuotaTypes.ClientIdQuotaEnabled + case _ => // Unexpected entity type + } Review Comment: I suppose that `activeQuotaEntities` could be rather large. Does it have any performance impact? We are basically moving from a O(1) operation to O(N) here. ########## core/src/main/scala/kafka/server/ClientQuotaManager.scala: ########## @@ -451,6 +450,35 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, } } + /** + * Helper method to update quotaTypesEnabled which is a bitwise OR combination of the enabled quota types. + * For example: + * - If UserQuotaEnabled = 2 and ClientIdQuotaEnabled = 1, then quotaTypesEnabled = 3 (2 | 1 = 3) + * - If UserClientIdQuotaEnabled = 4 and UserQuotaEnabled = 1, then quotaTypesEnabled = (4 | 1 = 5) + * - If UserClientIdQuotaEnabled = 4 and ClientIdQuotaEnabled = 2, then quotaTypesEnabled = 6 (4 | 2 = 6) + * - If all three are enabled (1 | 2 | 4), then quotaTypesEnabled = 7 + */ + private def updateQuotaTypes(): Unit = { + quotaTypesEnabled = if (clientQuotaCallbackPlugin.isDefined) { + QuotaTypes.CustomQuotas + } else { + QuotaTypes.NoQuotas + } + + activeQuotaEntities.forEach { + case KafkaQuotaEntity(Some(_), Some(_)) => + quotaTypesEnabled |= QuotaTypes.UserClientIdQuotaEnabled + case KafkaQuotaEntity(Some(_), None) => + quotaTypesEnabled |= QuotaTypes.UserQuotaEnabled + case KafkaQuotaEntity(None, Some(_)) => + quotaTypesEnabled |= QuotaTypes.ClientIdQuotaEnabled + case _ => // Unexpected entity type + } + + val activeEntities = if (activeQuotaEntities.isEmpty) "No active quota entities" else activeQuotaEntities.asScala.map(_.toString).mkString(", ") + info(s"Quota types enabled has been changed with active quota entities: [$activeEntities]") Review Comment: Generating long string could be costly. Does it bring real value? Is this something that we could log in debug? ########## core/src/main/scala/kafka/server/ClientQuotaManager.scala: ########## @@ -428,18 +426,19 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, try { val quotaEntity = KafkaQuotaEntity(userEntity, clientEntity) - if (userEntity.nonEmpty) { - if (quotaEntity.clientIdEntity.nonEmpty) - quotaTypesEnabled |= QuotaTypes.UserClientIdQuotaEnabled - else - quotaTypesEnabled |= QuotaTypes.UserQuotaEnabled - } else if (clientEntity.nonEmpty) - quotaTypesEnabled |= QuotaTypes.ClientIdQuotaEnabled - quota match { - case Some(newQuota) => quotaCallback.updateQuota(clientQuotaType, quotaEntity, newQuota.bound) - case None => quotaCallback.removeQuota(clientQuotaType, quotaEntity) + case Some(newQuota) => + quotaCallback.updateQuota(clientQuotaType, quotaEntity, newQuota.bound) + if(activeQuotaEntities.add(quotaEntity)){ Review Comment: nit: A space misses after `if` and another before `{`. ########## core/src/main/scala/kafka/server/ClientQuotaManager.scala: ########## @@ -533,6 +561,9 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, throttledChannelReaper.awaitShutdown() } + // Visible for testing + def getQuotaTypesEnabled: Int = quotaTypesEnabled Review Comment: nit: Should we just make `quotaTypesEnabled` package private? ########## core/src/main/scala/kafka/server/ClientQuotaManager.scala: ########## @@ -428,18 +426,19 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, try { val quotaEntity = KafkaQuotaEntity(userEntity, clientEntity) - if (userEntity.nonEmpty) { - if (quotaEntity.clientIdEntity.nonEmpty) - quotaTypesEnabled |= QuotaTypes.UserClientIdQuotaEnabled - else - quotaTypesEnabled |= QuotaTypes.UserQuotaEnabled - } else if (clientEntity.nonEmpty) - quotaTypesEnabled |= QuotaTypes.ClientIdQuotaEnabled - quota match { - case Some(newQuota) => quotaCallback.updateQuota(clientQuotaType, quotaEntity, newQuota.bound) - case None => quotaCallback.removeQuota(clientQuotaType, quotaEntity) + case Some(newQuota) => + quotaCallback.updateQuota(clientQuotaType, quotaEntity, newQuota.bound) + if(activeQuotaEntities.add(quotaEntity)){ + updateQuotaTypes() + } + case None => + quotaCallback.removeQuota(clientQuotaType, quotaEntity) + if (activeQuotaEntities.remove(quotaEntity)){ Review Comment: nit: A space misses before `{`. ########## core/src/main/scala/kafka/server/ClientQuotaManager.scala: ########## @@ -451,6 +450,35 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, } } + /** + * Helper method to update quotaTypesEnabled which is a bitwise OR combination of the enabled quota types. + * For example: + * - If UserQuotaEnabled = 2 and ClientIdQuotaEnabled = 1, then quotaTypesEnabled = 3 (2 | 1 = 3) + * - If UserClientIdQuotaEnabled = 4 and UserQuotaEnabled = 1, then quotaTypesEnabled = (4 | 1 = 5) + * - If UserClientIdQuotaEnabled = 4 and ClientIdQuotaEnabled = 2, then quotaTypesEnabled = 6 (4 | 2 = 6) + * - If all three are enabled (1 | 2 | 4), then quotaTypesEnabled = 7 + */ + private def updateQuotaTypes(): Unit = { + quotaTypesEnabled = if (clientQuotaCallbackPlugin.isDefined) { Review Comment: `quotaTypesEnabled` is a volatile variable. This suggests that we can access it anytime. In this method, we update it in a non atomic manner. I wonder if this could have any undesired impact. It would be better to use a local variable and to update `quotaTypesEnabled` at the end. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org