[GitHub] [kafka] splett2 commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits (KIP-612, part 2)
splett2 commented on a change in pull request #9386: URL: https://github.com/apache/kafka/pull/9386#discussion_r526525212 ## File path: core/src/main/scala/kafka/network/SocketServer.scala ## @@ -1476,33 +1650,36 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend * Creates sensor for tracking the connection creation rate and corresponding connection rate quota for a given * listener or broker-wide, if listener is not provided. * @param quotaLimit connection creation rate quota - * @param listenerOpt listener name if sensor is for a listener + * @param connectionQuotaEntity entity to create the sensor for */ - private def createConnectionRateQuotaSensor(quotaLimit: Int, listenerOpt: Option[String] = None): Sensor = { -val sensorName = listenerOpt.map(listener => s"ConnectionAcceptRate-$listener").getOrElse("ConnectionAcceptRate") -val sensor = metrics.sensor(sensorName, rateQuotaMetricConfig(quotaLimit)) -sensor.add(connectionRateMetricName(listenerOpt), new Rate, null) -info(s"Created $sensorName sensor, quotaLimit=$quotaLimit") -sensor + private def getOrCreateConnectionRateQuotaSensor(quotaLimit: Int, connectionQuotaEntity: ConnectionQuotaEntity): Sensor = { +Option(metrics.getSensor(connectionQuotaEntity.sensorName)).getOrElse { Review comment: I will add this detail to `recordIpConnectionMaybeThrottle`. I think that is a more fitting place to put the comment, since that's the section of code that calls `connectionRateForIp` and `getOrCreateConnectionRateQuotaSensor` which are the components we need to be atomic. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] splett2 commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits (KIP-612, part 2)
splett2 commented on a change in pull request #9386: URL: https://github.com/apache/kafka/pull/9386#discussion_r526528689 ## File path: core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala ## @@ -633,18 +837,27 @@ class ConnectionQuotasTest { listenerName: ListenerName, address: InetAddress, numConnections: Long, -timeIntervalMs: Long) : Unit = { +timeIntervalMs: Long, +expectIpThrottle: Boolean): Boolean = { var nextSendTime = System.currentTimeMillis + timeIntervalMs +var ipThrottled = false for (_ <- 0L until numConnections) { // this method may block if broker-wide or listener limit on the number of connections is reached - connectionQuotas.inc(listenerName, address, blockedPercentMeters(listenerName.value)) - + try { +connectionQuotas.inc(listenerName, address, blockedPercentMeters(listenerName.value)) + } catch { +case e: ConnectionThrottledException => + if (!expectIpThrottle) +throw e + ipThrottled = true Review comment: Thanks. I filed https://issues.apache.org/jira/browse/KAFKA-10744 for the clean up/conversion. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] splett2 commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits (KIP-612, part 2)
splett2 commented on a change in pull request #9386: URL: https://github.com/apache/kafka/pull/9386#discussion_r526527000 ## File path: core/src/main/scala/kafka/network/SocketServer.scala ## @@ -1324,7 +1404,60 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend private[network] def updateBrokerMaxConnectionRate(maxConnectionRate: Int): Unit = { // if there is a connection waiting on the rate throttle delay, we will let it wait the original delay even if // the rate limit increases, because it is just one connection per listener and the code is simpler that way -updateConnectionRateQuota(maxConnectionRate) +updateConnectionRateQuota(maxConnectionRate, BrokerQuotaEntity) + } + + /** + * Update the connection rate quota for a given IP and updates quota configs for updated IPs. + * If an IP is given, metric config will be updated only for the given IP, otherwise + * all metric configs will be checked and updated if required. + * + * @param ip ip to update or default if None + * @param maxConnectionRate new connection rate, or resets entity to default if None + */ + def updateIpConnectionRateQuota(ip: Option[InetAddress], maxConnectionRate: Option[Int]): Unit = synchronized { +def isIpConnectionRateMetric(metricName: MetricName) = { + metricName.name == ConnectionRateMetricName && + metricName.group == MetricsGroup && + metricName.tags.containsKey(IpMetricTag) +} + +def shouldUpdateQuota(metric: KafkaMetric, quotaLimit: Int) = { + quotaLimit != metric.config.quota.bound +} +ip match { + case Some(address) => +counts.synchronized { + maxConnectionRate match { +case Some(rate) => + info(s"Updating max connection rate override for $address to $rate") + connectionRatePerIp.put(address, rate) +case None => + info(s"Removing max connection rate override for $address") + connectionRatePerIp.remove(address) + } +} +updateConnectionRateQuota(connectionRateForIp(address), IpQuotaEntity(address)) + case None => +counts.synchronized { + defaultConnectionRatePerIp = maxConnectionRate.getOrElse(DynamicConfig.Ip.DefaultConnectionCreationRate) +} +info(s"Updated default max IP connection rate to $defaultConnectionRatePerIp") +metrics.metrics.forEach { (metricName, metric) => + if (isIpConnectionRateMetric(metricName)) { +val quota = connectionRateForIp(InetAddress.getByName(metricName.tags.get(IpMetricTag))) Review comment: reasons for using `InetAddress` would be that it's more consistent with the other IP-related data structures in `ConnectionQuotas`, `maxConnectionsPerIpOverrides` and `counts`. additionally, `inc()` is called with an `InetAddress`, so it's convenient to keep our data structure using `InetAddress` as a key so that we don't need to convert `InetAddress` => `String` in cases where we would not create a sensor (e.g., when IP quotas are disabled). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] splett2 commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits (KIP-612, part 2)
splett2 commented on a change in pull request #9386: URL: https://github.com/apache/kafka/pull/9386#discussion_r526527000 ## File path: core/src/main/scala/kafka/network/SocketServer.scala ## @@ -1324,7 +1404,60 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend private[network] def updateBrokerMaxConnectionRate(maxConnectionRate: Int): Unit = { // if there is a connection waiting on the rate throttle delay, we will let it wait the original delay even if // the rate limit increases, because it is just one connection per listener and the code is simpler that way -updateConnectionRateQuota(maxConnectionRate) +updateConnectionRateQuota(maxConnectionRate, BrokerQuotaEntity) + } + + /** + * Update the connection rate quota for a given IP and updates quota configs for updated IPs. + * If an IP is given, metric config will be updated only for the given IP, otherwise + * all metric configs will be checked and updated if required. + * + * @param ip ip to update or default if None + * @param maxConnectionRate new connection rate, or resets entity to default if None + */ + def updateIpConnectionRateQuota(ip: Option[InetAddress], maxConnectionRate: Option[Int]): Unit = synchronized { +def isIpConnectionRateMetric(metricName: MetricName) = { + metricName.name == ConnectionRateMetricName && + metricName.group == MetricsGroup && + metricName.tags.containsKey(IpMetricTag) +} + +def shouldUpdateQuota(metric: KafkaMetric, quotaLimit: Int) = { + quotaLimit != metric.config.quota.bound +} +ip match { + case Some(address) => +counts.synchronized { + maxConnectionRate match { +case Some(rate) => + info(s"Updating max connection rate override for $address to $rate") + connectionRatePerIp.put(address, rate) +case None => + info(s"Removing max connection rate override for $address") + connectionRatePerIp.remove(address) + } +} +updateConnectionRateQuota(connectionRateForIp(address), IpQuotaEntity(address)) + case None => +counts.synchronized { + defaultConnectionRatePerIp = maxConnectionRate.getOrElse(DynamicConfig.Ip.DefaultConnectionCreationRate) +} +info(s"Updated default max IP connection rate to $defaultConnectionRatePerIp") +metrics.metrics.forEach { (metricName, metric) => + if (isIpConnectionRateMetric(metricName)) { +val quota = connectionRateForIp(InetAddress.getByName(metricName.tags.get(IpMetricTag))) Review comment: reasons for using `InetAddress` would be that it's more consistent with the other IP-related data structures in `ConnectionQuotas`, `maxConnectionsPerIpOverrides` and `counts`. additionally, `inc()` is called with an `InetAddress`, so it's convenient to keep our data structure using `InetAddress` as a key so that we don't need to convert `InetAddress` => `String` in cases where we would skip creating a sensor (e.g., when IP quotas are disabled). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] splett2 commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits (KIP-612, part 2)
splett2 commented on a change in pull request #9386: URL: https://github.com/apache/kafka/pull/9386#discussion_r526527000 ## File path: core/src/main/scala/kafka/network/SocketServer.scala ## @@ -1324,7 +1404,60 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend private[network] def updateBrokerMaxConnectionRate(maxConnectionRate: Int): Unit = { // if there is a connection waiting on the rate throttle delay, we will let it wait the original delay even if // the rate limit increases, because it is just one connection per listener and the code is simpler that way -updateConnectionRateQuota(maxConnectionRate) +updateConnectionRateQuota(maxConnectionRate, BrokerQuotaEntity) + } + + /** + * Update the connection rate quota for a given IP and updates quota configs for updated IPs. + * If an IP is given, metric config will be updated only for the given IP, otherwise + * all metric configs will be checked and updated if required. + * + * @param ip ip to update or default if None + * @param maxConnectionRate new connection rate, or resets entity to default if None + */ + def updateIpConnectionRateQuota(ip: Option[InetAddress], maxConnectionRate: Option[Int]): Unit = synchronized { +def isIpConnectionRateMetric(metricName: MetricName) = { + metricName.name == ConnectionRateMetricName && + metricName.group == MetricsGroup && + metricName.tags.containsKey(IpMetricTag) +} + +def shouldUpdateQuota(metric: KafkaMetric, quotaLimit: Int) = { + quotaLimit != metric.config.quota.bound +} +ip match { + case Some(address) => +counts.synchronized { + maxConnectionRate match { +case Some(rate) => + info(s"Updating max connection rate override for $address to $rate") + connectionRatePerIp.put(address, rate) +case None => + info(s"Removing max connection rate override for $address") + connectionRatePerIp.remove(address) + } +} +updateConnectionRateQuota(connectionRateForIp(address), IpQuotaEntity(address)) + case None => +counts.synchronized { + defaultConnectionRatePerIp = maxConnectionRate.getOrElse(DynamicConfig.Ip.DefaultConnectionCreationRate) +} +info(s"Updated default max IP connection rate to $defaultConnectionRatePerIp") +metrics.metrics.forEach { (metricName, metric) => + if (isIpConnectionRateMetric(metricName)) { +val quota = connectionRateForIp(InetAddress.getByName(metricName.tags.get(IpMetricTag))) Review comment: reasons for using `InetAddress` would be that it's more consistent with the other IP-related data structures in `ConnectionQuotas`, e.g. `maxConnectionsPerIpOverrides` and `counts`. additionally, `inc()` is called with an `InetAddress`, so it's convenient to keep our data structure using `InetAddress` as a key so that we don't need to convert `InetAddress` => `String` in cases where we would skip creating a sensor (e.g., when IP quotas are disabled). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] splett2 commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits (KIP-612, part 2)
splett2 commented on a change in pull request #9386: URL: https://github.com/apache/kafka/pull/9386#discussion_r526525212 ## File path: core/src/main/scala/kafka/network/SocketServer.scala ## @@ -1476,33 +1650,36 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend * Creates sensor for tracking the connection creation rate and corresponding connection rate quota for a given * listener or broker-wide, if listener is not provided. * @param quotaLimit connection creation rate quota - * @param listenerOpt listener name if sensor is for a listener + * @param connectionQuotaEntity entity to create the sensor for */ - private def createConnectionRateQuotaSensor(quotaLimit: Int, listenerOpt: Option[String] = None): Sensor = { -val sensorName = listenerOpt.map(listener => s"ConnectionAcceptRate-$listener").getOrElse("ConnectionAcceptRate") -val sensor = metrics.sensor(sensorName, rateQuotaMetricConfig(quotaLimit)) -sensor.add(connectionRateMetricName(listenerOpt), new Rate, null) -info(s"Created $sensorName sensor, quotaLimit=$quotaLimit") -sensor + private def getOrCreateConnectionRateQuotaSensor(quotaLimit: Int, connectionQuotaEntity: ConnectionQuotaEntity): Sensor = { +Option(metrics.getSensor(connectionQuotaEntity.sensorName)).getOrElse { Review comment: I will add this detail to `recordIpConnectionMaybeThrottle`. I think that is a more fitting place to put the comment, since that's the section of code that calls `connectionRateForIp`, which is what we need to be threadsafe. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] splett2 commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits (KIP-612, part 2)
splett2 commented on a change in pull request #9386: URL: https://github.com/apache/kafka/pull/9386#discussion_r521856417 ## File path: core/src/main/scala/kafka/network/SocketServer.scala ## @@ -1324,7 +1401,59 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend private[network] def updateBrokerMaxConnectionRate(maxConnectionRate: Int): Unit = { // if there is a connection waiting on the rate throttle delay, we will let it wait the original delay even if // the rate limit increases, because it is just one connection per listener and the code is simpler that way -updateConnectionRateQuota(maxConnectionRate) +updateConnectionRateQuota(maxConnectionRate, BrokerQuotaEntity) + } + + /** + * Update the connection rate quota for a given IP and updates quota configs for updated IPs. + * If an IP is given, metric config will be updated only for the given IP, otherwise + * all metric configs will be checked and updated if required. + * + * @param ip ip to update or default if None + * @param maxConnectionRate new connection rate, or resets entity to default if None + */ + def updateIpConnectionRateQuota(ip: Option[String], maxConnectionRate: Option[Int]): Unit = { +def isIpConnectionRateMetric(metricName: MetricName) = { + metricName.name == ConnectionRateMetricName && + metricName.group == MetricsGroup && + metricName.tags.containsKey(IpMetricTag) +} + +def shouldUpdateQuota(metric: KafkaMetric, quotaLimit: Int) = { + quotaLimit != metric.config.quota.bound +} +counts.synchronized { Review comment: After thinking on this a bit more, I think that only locking on updates to `defaultConnectionRatePerIp` should be sufficient for correctness. ZK dynamic config changes are processed within one thread, so we will only have one thread executing in `updateIpConnectionRateQuota`. If we want to be really careful about this, we can have `updateIpConnectionRateQuota` synchronized on `ConnectionQuotas`. The case we want to avoid by synchronizing on `counts` is that thread 1 reads `defaultConnectionRatePerIp` as connection rate limit `A` while calling `inc()`, then thread 2 updates connection rate and quota metric config to `B`, then thread 1 resumes execution and creates a sensor/metric with quota limit `A` => inconsistency. If we synchronize on `counts` for only updates to `connectionRateForIp/defaultConnectionRate`, we know that thread 1 that has read a connection rate quota as `A` will finish creating quota metrics with limit `A` before thread 2 acquires the `counts` lock and updates `connectionRateForIp/defaultConnectionRate` to `B`. After thread 2 releases the `counts` lock, subsequent threads calling `inc()` will read the quota as `B` and create a metric as `B`. Thread 2 can then be able to update any quota metrics from `A` to `B`, without holding the `counts` lock knowing that there are no operations that could have read the default connection rate limit as `A` without already having finished created the sensor with quota as `A`, and that all subsequent quotas will be read and created as `B`. The only issue remaining is that we can get concurrent reads of `connectionRatePerIp` while updating quota metrics, but we can just replace `mutable.Map` with `ConcurrentHashMap` which is preferable to coarsely locking on `counts`. Let me know if I'm missing something here with respect to thread safety. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] splett2 commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits (KIP-612, part 2)
splett2 commented on a change in pull request #9386: URL: https://github.com/apache/kafka/pull/9386#discussion_r521856417 ## File path: core/src/main/scala/kafka/network/SocketServer.scala ## @@ -1324,7 +1401,59 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend private[network] def updateBrokerMaxConnectionRate(maxConnectionRate: Int): Unit = { // if there is a connection waiting on the rate throttle delay, we will let it wait the original delay even if // the rate limit increases, because it is just one connection per listener and the code is simpler that way -updateConnectionRateQuota(maxConnectionRate) +updateConnectionRateQuota(maxConnectionRate, BrokerQuotaEntity) + } + + /** + * Update the connection rate quota for a given IP and updates quota configs for updated IPs. + * If an IP is given, metric config will be updated only for the given IP, otherwise + * all metric configs will be checked and updated if required. + * + * @param ip ip to update or default if None + * @param maxConnectionRate new connection rate, or resets entity to default if None + */ + def updateIpConnectionRateQuota(ip: Option[String], maxConnectionRate: Option[Int]): Unit = { +def isIpConnectionRateMetric(metricName: MetricName) = { + metricName.name == ConnectionRateMetricName && + metricName.group == MetricsGroup && + metricName.tags.containsKey(IpMetricTag) +} + +def shouldUpdateQuota(metric: KafkaMetric, quotaLimit: Int) = { + quotaLimit != metric.config.quota.bound +} +counts.synchronized { Review comment: After thinking on this a bit more, I think that only locking on updates to `defaultConnectionRatePerIp` should be sufficient for correctness. ZK dynamic config changes are processed within one thread, so we will only have one thread executing in `updateIpConnectionRateQuota`. If we want to be really careful about this, we can have `updateIpConnectionRateQuota` synchronized on `ConnectionQuotas`. The case we want to avoid by synchronizing on `counts` is that thread 1 reads `defaultConnectionRatePerIp` as connection rate limit `A` while calling `inc()`, then thread 2 updates connection rate and quota metric config to `B`, then thread 1 resumes execution and creates a sensor/metric with quota limit `A` => inconsistency. If we synchronize on `counts` for only updates to `connectionRateForIp/defaultConnectionRate`, we know that thread 1 that has read a connection rate quota as `A` will finish creating quota metrics with quota metric config `A` before thread 2 acquires the `counts` lock and updates `connectionRateForIp/defaultConnectionRate` to `B`. After thread 2 releases the `counts` lock, subsequent threads calling `inc()` will read the quota as `B` and create a metric as `B`. Thread 2 can then be able to update any quota metrics from `A` to `B`, without holding the `counts` lock knowing that there are no operations that could have read the default connection rate limit as `A` without already having finished created the sensor with quota as `A`, and that all subsequent quotas will be read and created as `B`. The only issue remaining is that we can get concurrent reads of `connectionRatePerIp` while updating quota metrics, but we can just replace `mutable.Map` with `ConcurrentHashMap` which is preferable to coarsely locking on `counts`. Let me know if I'm missing something here with respect to thread safety. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] splett2 commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits (KIP-612, part 2)
splett2 commented on a change in pull request #9386: URL: https://github.com/apache/kafka/pull/9386#discussion_r523682718 ## File path: core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala ## @@ -633,18 +837,27 @@ class ConnectionQuotasTest { listenerName: ListenerName, address: InetAddress, numConnections: Long, -timeIntervalMs: Long) : Unit = { +timeIntervalMs: Long, +expectIpThrottle: Boolean): Boolean = { var nextSendTime = System.currentTimeMillis + timeIntervalMs +var ipThrottled = false for (_ <- 0L until numConnections) { // this method may block if broker-wide or listener limit on the number of connections is reached - connectionQuotas.inc(listenerName, address, blockedPercentMeters(listenerName.value)) - + try { +connectionQuotas.inc(listenerName, address, blockedPercentMeters(listenerName.value)) + } catch { +case e: ConnectionThrottledException => + if (!expectIpThrottle) +throw e + ipThrottled = true Review comment: I took a closer look at some of the IP throttling tests, and I think they are a little too coarse-grained with respect to testing the quota rate being enforced. I updated the IP throttling rate above/below tests that don't expect to block to use `acceptConnectionsAndVerifyRate` with `MockTime` to verify that the expected quota rate holds while getting throttled. It's a little messy, because a lot of the other tests for broker/listener quotas have to use `Time.SYSTEM` due to monitor waiting. I think it should also be possible to rewrite the broker/listener connection rate quota tests to use mock time by extending the `Time` interface, but I would prefer to do that in a follow-up PR. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] splett2 commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits (KIP-612, part 2)
splett2 commented on a change in pull request #9386: URL: https://github.com/apache/kafka/pull/9386#discussion_r523682718 ## File path: core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala ## @@ -633,18 +837,27 @@ class ConnectionQuotasTest { listenerName: ListenerName, address: InetAddress, numConnections: Long, -timeIntervalMs: Long) : Unit = { +timeIntervalMs: Long, +expectIpThrottle: Boolean): Boolean = { var nextSendTime = System.currentTimeMillis + timeIntervalMs +var ipThrottled = false for (_ <- 0L until numConnections) { // this method may block if broker-wide or listener limit on the number of connections is reached - connectionQuotas.inc(listenerName, address, blockedPercentMeters(listenerName.value)) - + try { +connectionQuotas.inc(listenerName, address, blockedPercentMeters(listenerName.value)) + } catch { +case e: ConnectionThrottledException => + if (!expectIpThrottle) +throw e + ipThrottled = true Review comment: I took a closer look at some of the IP throttling tests, and I think they are a little too coarse-grained with respect to testing the quota rate being enforced. I updated the IP throttling rate above/below tests that don't expect to block to instead use `acceptConnectionsAndVerifyRate` with `MockTime`. It's a little messy, because a lot of the other tests for broker/listener quotas have to use `Time.SYSTEM` due to monitor waiting. I think it should also be possible to rewrite the broker/listener connection rate quota tests to use mock time by extending the `Time` interface, but I would prefer to do that in a follow-up PR. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] splett2 commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits (KIP-612, part 2)
splett2 commented on a change in pull request #9386: URL: https://github.com/apache/kafka/pull/9386#discussion_r523682718 ## File path: core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala ## @@ -633,18 +837,27 @@ class ConnectionQuotasTest { listenerName: ListenerName, address: InetAddress, numConnections: Long, -timeIntervalMs: Long) : Unit = { +timeIntervalMs: Long, +expectIpThrottle: Boolean): Boolean = { var nextSendTime = System.currentTimeMillis + timeIntervalMs +var ipThrottled = false for (_ <- 0L until numConnections) { // this method may block if broker-wide or listener limit on the number of connections is reached - connectionQuotas.inc(listenerName, address, blockedPercentMeters(listenerName.value)) - + try { +connectionQuotas.inc(listenerName, address, blockedPercentMeters(listenerName.value)) + } catch { +case e: ConnectionThrottledException => + if (!expectIpThrottle) +throw e + ipThrottled = true Review comment: I took a closer look at some of the IP throttling tests, and I think they are a little too coarse-grained with respect to testing the quota rate being enforced. I updated the IP throttling rate above/below tests that don't expect to block to instead use `acceptConnectionsAndVerifyRate` with `MockTime`. It's a little messy, because a lot of the other tests for broker/listener quotas have to use `Time.SYSTEM` due to monitor waiting. I think it should also be possible to update the broker/listener quota tests in `ConnectionQuotasTest` and `DynamicConnectionQuotaTest` to use mock time by extending the `Time` interface, but I would prefer to do that in a follow-up PR. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] splett2 commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits (KIP-612, part 2)
splett2 commented on a change in pull request #9386: URL: https://github.com/apache/kafka/pull/9386#discussion_r523682718 ## File path: core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala ## @@ -633,18 +837,27 @@ class ConnectionQuotasTest { listenerName: ListenerName, address: InetAddress, numConnections: Long, -timeIntervalMs: Long) : Unit = { +timeIntervalMs: Long, +expectIpThrottle: Boolean): Boolean = { var nextSendTime = System.currentTimeMillis + timeIntervalMs +var ipThrottled = false for (_ <- 0L until numConnections) { // this method may block if broker-wide or listener limit on the number of connections is reached - connectionQuotas.inc(listenerName, address, blockedPercentMeters(listenerName.value)) - + try { +connectionQuotas.inc(listenerName, address, blockedPercentMeters(listenerName.value)) + } catch { +case e: ConnectionThrottledException => + if (!expectIpThrottle) +throw e + ipThrottled = true Review comment: I took a closer look at some of the IP throttling tests, and I think they are a little too coarse-grained with respect to testing the quota rate being enforced. I updated the IP throttling tests that don't expect to block to instead use `acceptConnectionsAndVerifyRate` with `MockTime`. It's a little messy, because a lot of the other tests for broker/listener quotas have to use `Time.SYSTEM` due to monitor waiting. I think it should also be possible to update the broker/listener quota tests in `ConnectionQuotasTest` and `DynamicConnectionQuotaTest` to use mock time by extending the `Time` interface, but I would prefer to do that in a follow-up PR. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] splett2 commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits (KIP-612, part 2)
splett2 commented on a change in pull request #9386: URL: https://github.com/apache/kafka/pull/9386#discussion_r523682718 ## File path: core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala ## @@ -633,18 +837,27 @@ class ConnectionQuotasTest { listenerName: ListenerName, address: InetAddress, numConnections: Long, -timeIntervalMs: Long) : Unit = { +timeIntervalMs: Long, +expectIpThrottle: Boolean): Boolean = { var nextSendTime = System.currentTimeMillis + timeIntervalMs +var ipThrottled = false for (_ <- 0L until numConnections) { // this method may block if broker-wide or listener limit on the number of connections is reached - connectionQuotas.inc(listenerName, address, blockedPercentMeters(listenerName.value)) - + try { +connectionQuotas.inc(listenerName, address, blockedPercentMeters(listenerName.value)) + } catch { +case e: ConnectionThrottledException => + if (!expectIpThrottle) +throw e + ipThrottled = true Review comment: I took a closer look at some of the IP throttling tests, and I think they are a little too coarse-grained with respect to testing the throttle rate being enforced. I updated the IP throttling tests that don't expect to block to instead use `acceptConnectionsAndVerifyRate` with `MockTime`. I think it should also be possible to update the broker/listener quota tests in `ConnectionQuotasTest` and `DynamicConnectionQuotaTest` to use mock time by extending the `Time` interface, but I would prefer to do that in a follow-up PR. ## File path: core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala ## @@ -633,18 +837,27 @@ class ConnectionQuotasTest { listenerName: ListenerName, address: InetAddress, numConnections: Long, -timeIntervalMs: Long) : Unit = { +timeIntervalMs: Long, +expectIpThrottle: Boolean): Boolean = { var nextSendTime = System.currentTimeMillis + timeIntervalMs +var ipThrottled = false for (_ <- 0L until numConnections) { // this method may block if broker-wide or listener limit on the number of connections is reached - connectionQuotas.inc(listenerName, address, blockedPercentMeters(listenerName.value)) - + try { +connectionQuotas.inc(listenerName, address, blockedPercentMeters(listenerName.value)) + } catch { +case e: ConnectionThrottledException => + if (!expectIpThrottle) +throw e + ipThrottled = true Review comment: I took a closer look at some of the IP throttling tests, and I think they are a little too coarse-grained with respect to testing the quota rate being enforced. I updated the IP throttling tests that don't expect to block to instead use `acceptConnectionsAndVerifyRate` with `MockTime`. I think it should also be possible to update the broker/listener quota tests in `ConnectionQuotasTest` and `DynamicConnectionQuotaTest` to use mock time by extending the `Time` interface, but I would prefer to do that in a follow-up PR. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] splett2 commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits (KIP-612, part 2)
splett2 commented on a change in pull request #9386: URL: https://github.com/apache/kafka/pull/9386#discussion_r523682718 ## File path: core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala ## @@ -633,18 +837,27 @@ class ConnectionQuotasTest { listenerName: ListenerName, address: InetAddress, numConnections: Long, -timeIntervalMs: Long) : Unit = { +timeIntervalMs: Long, +expectIpThrottle: Boolean): Boolean = { var nextSendTime = System.currentTimeMillis + timeIntervalMs +var ipThrottled = false for (_ <- 0L until numConnections) { // this method may block if broker-wide or listener limit on the number of connections is reached - connectionQuotas.inc(listenerName, address, blockedPercentMeters(listenerName.value)) - + try { +connectionQuotas.inc(listenerName, address, blockedPercentMeters(listenerName.value)) + } catch { +case e: ConnectionThrottledException => + if (!expectIpThrottle) +throw e + ipThrottled = true Review comment: I took a closer look at some of the IP throttling tests, and I think they are a little too coarse-grained with respect to testing the throttle rate. I updated the IP throttling tests that don't expect to block to instead use `acceptConnectionsAndVerifyRate` with `MockTime`. I think it should also be possible to update the broker/listener quota tests in `ConnectionQuotasTest` and `DynamicConnectionQuotaTest` to use mock time by extending the `Time` interface, but I would prefer to do that in a follow-up PR. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] splett2 commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits (KIP-612, part 2)
splett2 commented on a change in pull request #9386: URL: https://github.com/apache/kafka/pull/9386#discussion_r521566585 ## File path: core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala ## @@ -633,18 +837,27 @@ class ConnectionQuotasTest { listenerName: ListenerName, address: InetAddress, numConnections: Long, -timeIntervalMs: Long) : Unit = { +timeIntervalMs: Long, +expectIpThrottle: Boolean): Boolean = { var nextSendTime = System.currentTimeMillis + timeIntervalMs +var ipThrottled = false for (_ <- 0L until numConnections) { // this method may block if broker-wide or listener limit on the number of connections is reached - connectionQuotas.inc(listenerName, address, blockedPercentMeters(listenerName.value)) - + try { +connectionQuotas.inc(listenerName, address, blockedPercentMeters(listenerName.value)) + } catch { +case e: ConnectionThrottledException => + if (!expectIpThrottle) +throw e + ipThrottled = true Review comment: I took a closer look at some of the IP throttling tests, and I think they are a little too coarse-grained with respect to testing the throttle rate. I updated the IP throttling tests that don't expect to block to instead use `acceptConnectionsAndVerifyRate` with `MockTime`. I think it should also be possible to update the broker/listener quota tests in `ConnectionQuotasTest` and `DynamicConnectionQuotaTest` to use mock time by extending the `Time` interface, but I would prefer to do that in a follow-up PR. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] splett2 commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits (KIP-612, part 2)
splett2 commented on a change in pull request #9386: URL: https://github.com/apache/kafka/pull/9386#discussion_r521566585 ## File path: core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala ## @@ -633,18 +837,27 @@ class ConnectionQuotasTest { listenerName: ListenerName, address: InetAddress, numConnections: Long, -timeIntervalMs: Long) : Unit = { +timeIntervalMs: Long, +expectIpThrottle: Boolean): Boolean = { var nextSendTime = System.currentTimeMillis + timeIntervalMs +var ipThrottled = false for (_ <- 0L until numConnections) { // this method may block if broker-wide or listener limit on the number of connections is reached - connectionQuotas.inc(listenerName, address, blockedPercentMeters(listenerName.value)) - + try { +connectionQuotas.inc(listenerName, address, blockedPercentMeters(listenerName.value)) + } catch { +case e: ConnectionThrottledException => + if (!expectIpThrottle) +throw e + ipThrottled = true Review comment: I took a closer look at some of the IP throttling tests, and I think they are a little too coarse-grained with respect to testing the throttle rate. I updated the IP throttling tests that don't expect to block to instead use `acceptConnectionsAndVerifyRate` with `MockTime`. I think it should also be possible to update the broker/listener tests in `ConnectionQuotasTest` to use mock time by extending the `Time` interface for waiting on an object monitor, but I would prefer to do that in a follow-up PR. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] splett2 commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits (KIP-612, part 2)
splett2 commented on a change in pull request #9386: URL: https://github.com/apache/kafka/pull/9386#discussion_r521856417 ## File path: core/src/main/scala/kafka/network/SocketServer.scala ## @@ -1324,7 +1401,59 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend private[network] def updateBrokerMaxConnectionRate(maxConnectionRate: Int): Unit = { // if there is a connection waiting on the rate throttle delay, we will let it wait the original delay even if // the rate limit increases, because it is just one connection per listener and the code is simpler that way -updateConnectionRateQuota(maxConnectionRate) +updateConnectionRateQuota(maxConnectionRate, BrokerQuotaEntity) + } + + /** + * Update the connection rate quota for a given IP and updates quota configs for updated IPs. + * If an IP is given, metric config will be updated only for the given IP, otherwise + * all metric configs will be checked and updated if required. + * + * @param ip ip to update or default if None + * @param maxConnectionRate new connection rate, or resets entity to default if None + */ + def updateIpConnectionRateQuota(ip: Option[String], maxConnectionRate: Option[Int]): Unit = { +def isIpConnectionRateMetric(metricName: MetricName) = { + metricName.name == ConnectionRateMetricName && + metricName.group == MetricsGroup && + metricName.tags.containsKey(IpMetricTag) +} + +def shouldUpdateQuota(metric: KafkaMetric, quotaLimit: Int) = { + quotaLimit != metric.config.quota.bound +} +counts.synchronized { Review comment: After thinking on this a bit more, I think that only locking on updates to `defaultConnectionRatePerIp` should be sufficient for correctness. ZK dynamic config changes are processed within one thread, so we will only have one thread executing in `updateIpConnectionRateQuota`. If we want to be really careful about this, we can have `updateIpConnectionRateQuota` synchronized on `ConnectionQuotas`. The case we want to avoid by synchronizing on `counts` is that thread 1 reads `defaultConnectionRatePerIp` as connection rate limit `A` while calling `inc()`, then thread 2 updates connection rate and quota metrics to `B`, then thread 1 resumes execution and creates a sensor/metric with quota limit `A` => inconsistency. If we synchronize on `counts` for only updates to `connectionRateForIp/defaultConnectionRate`, we know that thread 1 that has read a connection rate quota as `A` will finish creating quota metrics with limit `A` before thread 2 acquires the `counts` lock and updates `connectionRateForIp/defaultConnectionRate` to `B`. After thread 2 releases the `counts` lock, subsequent threads calling `inc()` will read the quota as `B` and create a metric as `B`. Thread 2 can then be able to update any quota metrics from `A` to `B`, without holding the `counts` lock knowing that there are no operations that could have read the default connection rate limit as `A` without already having finished created the sensor with quota as `A`, and that all subsequent quotas will be read and created as `B`. The only issue remaining is that we can get concurrent reads of `connectionRatePerIp` while updating quota metrics, but we can just replace `mutable.Map` with `ConcurrentHashMap` which is preferable to coarsely locking on `counts`. Let me know if I'm missing something here with respect to thread safety. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] splett2 commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits (KIP-612, part 2)
splett2 commented on a change in pull request #9386: URL: https://github.com/apache/kafka/pull/9386#discussion_r521856417 ## File path: core/src/main/scala/kafka/network/SocketServer.scala ## @@ -1324,7 +1401,59 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend private[network] def updateBrokerMaxConnectionRate(maxConnectionRate: Int): Unit = { // if there is a connection waiting on the rate throttle delay, we will let it wait the original delay even if // the rate limit increases, because it is just one connection per listener and the code is simpler that way -updateConnectionRateQuota(maxConnectionRate) +updateConnectionRateQuota(maxConnectionRate, BrokerQuotaEntity) + } + + /** + * Update the connection rate quota for a given IP and updates quota configs for updated IPs. + * If an IP is given, metric config will be updated only for the given IP, otherwise + * all metric configs will be checked and updated if required. + * + * @param ip ip to update or default if None + * @param maxConnectionRate new connection rate, or resets entity to default if None + */ + def updateIpConnectionRateQuota(ip: Option[String], maxConnectionRate: Option[Int]): Unit = { +def isIpConnectionRateMetric(metricName: MetricName) = { + metricName.name == ConnectionRateMetricName && + metricName.group == MetricsGroup && + metricName.tags.containsKey(IpMetricTag) +} + +def shouldUpdateQuota(metric: KafkaMetric, quotaLimit: Int) = { + quotaLimit != metric.config.quota.bound +} +counts.synchronized { Review comment: After thinking on this a bit more, I think that only locking on updates to `defaultConnectionRatePerIp` should be sufficient for correctness. ZK dynamic config changes are processed within one thread, so we will only have one thread executing in `updateIpConnectionRateQuota`. If we want to be really careful about this, we can have `updateIpConnectionRateQuota` synchronized on `ConnectionQuotas`. The case we want to avoid by synchronizing on `counts` is that thread 1 reads `defaultConnectionRatePerIp` as connection rate limit `A` while calling `inc()`, then thread 2 updates connection rate and quota metrics to `B`, then thread 1 resumes execution and creates a sensor/metric with quota limit `A` => inconsistency. If we synchronize on `counts` for only updates to `connectionRateForIp/defaultConnectionRate`, we know that thread 1 that has read a connection rate quota as `A` will finish creating quota metrics with limit `A` before thread 2 acquires the `counts` lock and updates `connectionRateForIp/defaultConnectionRate` to `B`. After thread 2 releases the `counts` lock, subsequent threads calling `inc()` will read the quota as `B` and create a metric as `B`. Thread 2 can then be able to update any quota metrics from `A` to `B`, without holding the `counts` lock knowing that there are no operations that could have read the default connection rate limit as `A` without already having finished created the sensor with quota as `A`. The only issue remaining is that we can get concurrent reads of `connectionRatePerIp` while updating quota metrics, but we can just replace `mutable.Map` with `ConcurrentHashMap` which is preferable to coarsely locking on `counts`. Let me know if I'm missing something here with respect to thread safety. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] splett2 commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits (KIP-612, part 2)
splett2 commented on a change in pull request #9386: URL: https://github.com/apache/kafka/pull/9386#discussion_r521856417 ## File path: core/src/main/scala/kafka/network/SocketServer.scala ## @@ -1324,7 +1401,59 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend private[network] def updateBrokerMaxConnectionRate(maxConnectionRate: Int): Unit = { // if there is a connection waiting on the rate throttle delay, we will let it wait the original delay even if // the rate limit increases, because it is just one connection per listener and the code is simpler that way -updateConnectionRateQuota(maxConnectionRate) +updateConnectionRateQuota(maxConnectionRate, BrokerQuotaEntity) + } + + /** + * Update the connection rate quota for a given IP and updates quota configs for updated IPs. + * If an IP is given, metric config will be updated only for the given IP, otherwise + * all metric configs will be checked and updated if required. + * + * @param ip ip to update or default if None + * @param maxConnectionRate new connection rate, or resets entity to default if None + */ + def updateIpConnectionRateQuota(ip: Option[String], maxConnectionRate: Option[Int]): Unit = { +def isIpConnectionRateMetric(metricName: MetricName) = { + metricName.name == ConnectionRateMetricName && + metricName.group == MetricsGroup && + metricName.tags.containsKey(IpMetricTag) +} + +def shouldUpdateQuota(metric: KafkaMetric, quotaLimit: Int) = { + quotaLimit != metric.config.quota.bound +} +counts.synchronized { Review comment: After thinking on this a bit more, I think that only locking on updates to `defaultConnectionRatePerIp` should be sufficient for correctness. ZK dynamic config changes are processed within one thread, so we will only have one thread executing in `updateIpConnectionRateQuota`. If we want to be really careful about this, we can have `updateIpConnectionRateQuota` synchronized on `ConnectionQuotas`. The case we want to avoid by synchronizing on `counts` is that another thread reads `defaultConnectionRatePerIp` as connection rate limit `A` while calling `inc()`, then we update connection rate and quota metrics to `B`, then the other thread creates a sensor/metric with quota limit `A` => inconsistency. If we synchronize on `counts` for only updates to `connectionRateForIp/defaultConnectionRate`, we know that any thread that has read a connection rate quota as `A` will finish creating quota metrics with limit `A` before we acquire the `counts` lock. After acquiring the `counts` lock and updating `connectionRateForIp/defaultConnectionRate` to `B`, any subsequent threads calling `inc()` will read the quota as `B` and create a metric as `B`. We will then be able to update any quota metrics from `A` to `B`, knowing that there are no operations that could have read the connection rate limit as `A` without already having created the quota metric. The only issue remaining is that we can get concurrent reads of `connectionRatePerIp` while updating quota metrics, but we can just replace `mutable.Map` with `ConcurrentHashMap` which is preferable to coarsely locking on `counts`. Let me know if I'm missing something here with respect to thread safety. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] splett2 commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits (KIP-612, part 2)
splett2 commented on a change in pull request #9386: URL: https://github.com/apache/kafka/pull/9386#discussion_r521549126 ## File path: core/src/main/scala/kafka/network/SocketServer.scala ## @@ -538,14 +542,20 @@ private[kafka] class Acceptor(val endPoint: EndPoint, val recvBufferSize: Int, brokerId: Int, connectionQuotas: ConnectionQuotas, - metricPrefix: String) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup { + metricPrefix: String, + time: Time) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup { private val nioSelector = NSelector.open() val serverChannel = openServerSocket(endPoint.host, endPoint.port) private val processors = new ArrayBuffer[Processor]() private val processorsStarted = new AtomicBoolean private val blockedPercentMeter = newMeter(s"${metricPrefix}AcceptorBlockedPercent", "blocked time", TimeUnit.NANOSECONDS, Map(ListenerMetricTag -> endPoint.listenerName.value)) + private var currentProcessorIndex = 0 + private[network] case class DelayedCloseSocket(socket: SocketChannel, endThrottleTimeMs: Long) extends Ordered[DelayedCloseSocket] { +override def compare(that: DelayedCloseSocket): Int = endThrottleTimeMs compare that.endThrottleTimeMs + } + private[network] val throttledSockets = new mutable.PriorityQueue[DelayedCloseSocket]() Review comment: I explained elsewhere, but my comment on may have gotten lost in the weeds. for reference: I did not use the java DelayQueue similar to the implementation in ClientQuotaManager because this throttling implementation does not use timeout-based polling or require a synchronized data structure, and there's a bit more boilerplate needed for using a DelayQueue. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] splett2 commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits (KIP-612, part 2)
splett2 commented on a change in pull request #9386: URL: https://github.com/apache/kafka/pull/9386#discussion_r521856417 ## File path: core/src/main/scala/kafka/network/SocketServer.scala ## @@ -1324,7 +1401,59 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend private[network] def updateBrokerMaxConnectionRate(maxConnectionRate: Int): Unit = { // if there is a connection waiting on the rate throttle delay, we will let it wait the original delay even if // the rate limit increases, because it is just one connection per listener and the code is simpler that way -updateConnectionRateQuota(maxConnectionRate) +updateConnectionRateQuota(maxConnectionRate, BrokerQuotaEntity) + } + + /** + * Update the connection rate quota for a given IP and updates quota configs for updated IPs. + * If an IP is given, metric config will be updated only for the given IP, otherwise + * all metric configs will be checked and updated if required. + * + * @param ip ip to update or default if None + * @param maxConnectionRate new connection rate, or resets entity to default if None + */ + def updateIpConnectionRateQuota(ip: Option[String], maxConnectionRate: Option[Int]): Unit = { +def isIpConnectionRateMetric(metricName: MetricName) = { + metricName.name == ConnectionRateMetricName && + metricName.group == MetricsGroup && + metricName.tags.containsKey(IpMetricTag) +} + +def shouldUpdateQuota(metric: KafkaMetric, quotaLimit: Int) = { + quotaLimit != metric.config.quota.bound +} +counts.synchronized { Review comment: After thinking on this a bit more, I think that only locking on updates to `defaultConnectionRatePerIp` should be sufficient for correctness. ZK dynamic config changes are processed within one thread, so we will only have one thread executing in `updateIpConnectionRateQuota`. If we want to be really careful about this, we can have `updateIpConnectionRateQuota` synchronized on `ConnectionQuotas`. The case we want to avoid by synchronizing on `counts` is that another thread `defaultConnectionRatePerIp` as connection rate limit `A` while calling `inc()`, then we update connection rate and quota metrics to `B`, then the other thread creates a sensor/metric with quota limit `A` => inconsistency. If we synchronize on `counts` for only updates to `connectionRateForIp/defaultConnectionRate`, we know that any thread that has read a connection rate quota as `A` will finish creating quota metrics with limit `A` before we acquire the `counts` lock. After acquiring the `counts` lock and updating `connectionRateForIp/defaultConnectionRate` to `B`, any subsequent threads calling `inc()` will read the quota as `B` and create a metric as `B`. We will then be able to update any quota metrics from `A` to `B`, knowing that there are no operations that could have read the connection rate limit as `A` without already having created the quota metric. The only issue remaining is that we can get concurrent reads of `connectionRatePerIp` while updating quota metrics, but we can just replace `mutable.Map` with `ConcurrentHashMap` which is preferable to coarsely locking on `counts`. Let me know if I'm missing something here with respect to thread safety. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] splett2 commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits (KIP-612, part 2)
splett2 commented on a change in pull request #9386: URL: https://github.com/apache/kafka/pull/9386#discussion_r521856417 ## File path: core/src/main/scala/kafka/network/SocketServer.scala ## @@ -1324,7 +1401,59 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend private[network] def updateBrokerMaxConnectionRate(maxConnectionRate: Int): Unit = { // if there is a connection waiting on the rate throttle delay, we will let it wait the original delay even if // the rate limit increases, because it is just one connection per listener and the code is simpler that way -updateConnectionRateQuota(maxConnectionRate) +updateConnectionRateQuota(maxConnectionRate, BrokerQuotaEntity) + } + + /** + * Update the connection rate quota for a given IP and updates quota configs for updated IPs. + * If an IP is given, metric config will be updated only for the given IP, otherwise + * all metric configs will be checked and updated if required. + * + * @param ip ip to update or default if None + * @param maxConnectionRate new connection rate, or resets entity to default if None + */ + def updateIpConnectionRateQuota(ip: Option[String], maxConnectionRate: Option[Int]): Unit = { +def isIpConnectionRateMetric(metricName: MetricName) = { + metricName.name == ConnectionRateMetricName && + metricName.group == MetricsGroup && + metricName.tags.containsKey(IpMetricTag) +} + +def shouldUpdateQuota(metric: KafkaMetric, quotaLimit: Int) = { + quotaLimit != metric.config.quota.bound +} +counts.synchronized { Review comment: After thinking on this a bit more, I think that only locking on updates to `defaultConnectionRatePerIp` should be sufficient for correctness. ZK dynamic config changes are processed within one thread, so we will only have one thread executing in `updateIpConnectionRateQuota`. If we want to be really careful about this, we can have `IpConfigHandler` synchronize on `ConnectionQuotas`. The case we want to avoid by synchronizing on `counts` is that another thread `defaultConnectionRatePerIp` as connection rate limit `A` while calling `inc()`, then we update connection rate and quota metrics to `B`, then the other thread creates a sensor/metric with quota limit `A` => inconsistency. If we synchronize on `counts` for only updates to `connectionRateForIp/defaultConnectionRate`, we know that any thread that has read a connection rate quota as `A` will finish creating quota metrics with limit `A` before we acquire the `counts` lock. After acquiring the `counts` lock and updating `connectionRateForIp/defaultConnectionRate` to `B`, any subsequent threads calling `inc()` will read the quota as `B` and create a metric as `B`. We will then be able to update any quota metrics from `A` to `B`, knowing that there are no operations that could have read the connection rate limit as `A` without already having created the quota metric. The only issue remaining is that we can get concurrent reads of `connectionRatePerIp` while updating quota metrics, but we can just replace `mutable.Map` with `ConcurrentHashMap` which is preferable to coarsely locking on `counts`. Let me know if I'm missing something here with respect to thread safety. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] splett2 commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits (KIP-612, part 2)
splett2 commented on a change in pull request #9386: URL: https://github.com/apache/kafka/pull/9386#discussion_r521856417 ## File path: core/src/main/scala/kafka/network/SocketServer.scala ## @@ -1324,7 +1401,59 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend private[network] def updateBrokerMaxConnectionRate(maxConnectionRate: Int): Unit = { // if there is a connection waiting on the rate throttle delay, we will let it wait the original delay even if // the rate limit increases, because it is just one connection per listener and the code is simpler that way -updateConnectionRateQuota(maxConnectionRate) +updateConnectionRateQuota(maxConnectionRate, BrokerQuotaEntity) + } + + /** + * Update the connection rate quota for a given IP and updates quota configs for updated IPs. + * If an IP is given, metric config will be updated only for the given IP, otherwise + * all metric configs will be checked and updated if required. + * + * @param ip ip to update or default if None + * @param maxConnectionRate new connection rate, or resets entity to default if None + */ + def updateIpConnectionRateQuota(ip: Option[String], maxConnectionRate: Option[Int]): Unit = { +def isIpConnectionRateMetric(metricName: MetricName) = { + metricName.name == ConnectionRateMetricName && + metricName.group == MetricsGroup && + metricName.tags.containsKey(IpMetricTag) +} + +def shouldUpdateQuota(metric: KafkaMetric, quotaLimit: Int) = { + quotaLimit != metric.config.quota.bound +} +counts.synchronized { Review comment: After thinking on this a bit more, I think that only locking on updates to `defaultConnectionRatePerIp` should be sufficient for correctness. ZK dynamic config changes are processed within one thread, so we will only have one thread executing in `updateIpConnectionRateQuota`. If we want to be really careful about this, we can have `IpConfigHandler` synchronize on `ConnectionQuotas`. The case we want to avoid by synchronizing on `counts` is that another thread `defaultConnectionRatePerIp` as connection rate limit `A` while calling `inc()`, then we update connection rate and quota metrics to `B`, then the other thread creates a sensor/metric with quota limit `A` => inconsistency. If we synchronize on `counts` for only updates to `connectionRateForIp/defaultConnectionRate`, we know that any thread that has read a connection rate quota as `A` will finish creating quota metrics with limit `A` before we acquire the `counts` lock. After acquiring the `counts` lock and updating `connectionRateForIp/defaultConnectionRate` to `B`, any subsequent threads calling `inc()` will read the quota as `B` and create a metric as `B`. We will then be able to update any quota metrics from `A` to `B`, knowing that there are no operations that could have read the connection rate limit as `A` without already having created the quota metric. The only issue remaining is that we can get concurrent reads of `connectionRatePerIp`, but we can just replace `mutable.Map` with `ConcurrentHashMap` which is preferable to coarsely locking on `counts`. Let me know if I'm missing something here with respect to thread safety. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] splett2 commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits (KIP-612, part 2)
splett2 commented on a change in pull request #9386: URL: https://github.com/apache/kafka/pull/9386#discussion_r521856417 ## File path: core/src/main/scala/kafka/network/SocketServer.scala ## @@ -1324,7 +1401,59 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend private[network] def updateBrokerMaxConnectionRate(maxConnectionRate: Int): Unit = { // if there is a connection waiting on the rate throttle delay, we will let it wait the original delay even if // the rate limit increases, because it is just one connection per listener and the code is simpler that way -updateConnectionRateQuota(maxConnectionRate) +updateConnectionRateQuota(maxConnectionRate, BrokerQuotaEntity) + } + + /** + * Update the connection rate quota for a given IP and updates quota configs for updated IPs. + * If an IP is given, metric config will be updated only for the given IP, otherwise + * all metric configs will be checked and updated if required. + * + * @param ip ip to update or default if None + * @param maxConnectionRate new connection rate, or resets entity to default if None + */ + def updateIpConnectionRateQuota(ip: Option[String], maxConnectionRate: Option[Int]): Unit = { +def isIpConnectionRateMetric(metricName: MetricName) = { + metricName.name == ConnectionRateMetricName && + metricName.group == MetricsGroup && + metricName.tags.containsKey(IpMetricTag) +} + +def shouldUpdateQuota(metric: KafkaMetric, quotaLimit: Int) = { + quotaLimit != metric.config.quota.bound +} +counts.synchronized { Review comment: After thinking on this a bit more, I think that only locking on updates to `defaultConnectionRatePerIp` should be sufficient for correctness. ZK dynamic config changes are processed within one thread, so we will only have one thread executing in `updateIpConnectionRateQuota`. If we want to be really careful about this, we can have `IpConfigHandler` synchronize on `ConnectionQuotas`. The case we want to avoid by synchronizing on `counts` is that another thread `defaultConnectionRatePerIp` as connection rate limit `A` while calling `inc()`, then we update connection rate and quota metrics to `B`, then the other thread creates a sensor/metric with quota limit `A` => inconsistency. If we synchronize on `counts` for only updates to `connectionRateForIp/defaultConnectionRate`, we know that any thread that has read a connection rate quota as `A` will finish creating quota metrics with limit `A` before we acquire the `counts` lock. After acquiring the `counts` lock and updating `connectionRateForIp/defaultConnectionRate` to `B`, any subsequent threads calling `inc()` will read the quota as `B` and create a metric as `B`. We will then be able to update any quota metrics from `A` to `B`, knowing that there are no operations that could have read the connection rate limit as `A` without already having created the quota metric. The only issue remaining is that we can get concurrent reads of `connectionRatePerIp`, but we can just replace `mutable.Map` with `ConcurrentHashMap`. Let me know if I'm missing something here with respect to thread safety. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] splett2 commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits (KIP-612, part 2)
splett2 commented on a change in pull request #9386: URL: https://github.com/apache/kafka/pull/9386#discussion_r521856417 ## File path: core/src/main/scala/kafka/network/SocketServer.scala ## @@ -1324,7 +1401,59 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend private[network] def updateBrokerMaxConnectionRate(maxConnectionRate: Int): Unit = { // if there is a connection waiting on the rate throttle delay, we will let it wait the original delay even if // the rate limit increases, because it is just one connection per listener and the code is simpler that way -updateConnectionRateQuota(maxConnectionRate) +updateConnectionRateQuota(maxConnectionRate, BrokerQuotaEntity) + } + + /** + * Update the connection rate quota for a given IP and updates quota configs for updated IPs. + * If an IP is given, metric config will be updated only for the given IP, otherwise + * all metric configs will be checked and updated if required. + * + * @param ip ip to update or default if None + * @param maxConnectionRate new connection rate, or resets entity to default if None + */ + def updateIpConnectionRateQuota(ip: Option[String], maxConnectionRate: Option[Int]): Unit = { +def isIpConnectionRateMetric(metricName: MetricName) = { + metricName.name == ConnectionRateMetricName && + metricName.group == MetricsGroup && + metricName.tags.containsKey(IpMetricTag) +} + +def shouldUpdateQuota(metric: KafkaMetric, quotaLimit: Int) = { + quotaLimit != metric.config.quota.bound +} +counts.synchronized { Review comment: I'm thinking on this a bit more, and I think that only locking on updates to `defaultConnectionRatePerIp` should be sufficient for correctness. ZK dynamic config changes are processed within one thread, so we will only have one thread executing in `updateIpConnectionRateQuota`. If we want to be really careful about this, we can have `IpConfigHandler` synchronize on `ConnectionQuotas`. The case we want to avoid by synchronizing on `counts` is that another thread `defaultConnectionRatePerIp` as connection rate limit `A` while calling `inc()`, then we update connection rate and quota metrics to `B`, then the other thread creates a sensor/metric with quota limit `A` => inconsistency. If we synchronize on `counts` for only updates to `connectionRateForIp/defaultConnectionRate`, we know that any thread that has read a connection rate quota as `A` will finish creating quota metrics with limit `A` before we acquire the `counts` lock. After acquiring the `counts` lock and updating `connectionRateForIp/defaultConnectionRate` to `B`, any subsequent threads calling `inc()` will read the quota as `B` and create a metric as `B`. We will then be able to update any quota metrics from `A` to `B`, knowing that there are no operations that could have read the connection rate limit as `A` without already having created the quota metric. The only issue remaining is that we can get concurrent reads of `connectionRatePerIp`, but we can just replace `mutable.Map` with `ConcurrentHashMap`. Let me know if I'm missing something here with respect to thread safety. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] splett2 commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits (KIP-612, part 2)
splett2 commented on a change in pull request #9386: URL: https://github.com/apache/kafka/pull/9386#discussion_r521856417 ## File path: core/src/main/scala/kafka/network/SocketServer.scala ## @@ -1324,7 +1401,59 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend private[network] def updateBrokerMaxConnectionRate(maxConnectionRate: Int): Unit = { // if there is a connection waiting on the rate throttle delay, we will let it wait the original delay even if // the rate limit increases, because it is just one connection per listener and the code is simpler that way -updateConnectionRateQuota(maxConnectionRate) +updateConnectionRateQuota(maxConnectionRate, BrokerQuotaEntity) + } + + /** + * Update the connection rate quota for a given IP and updates quota configs for updated IPs. + * If an IP is given, metric config will be updated only for the given IP, otherwise + * all metric configs will be checked and updated if required. + * + * @param ip ip to update or default if None + * @param maxConnectionRate new connection rate, or resets entity to default if None + */ + def updateIpConnectionRateQuota(ip: Option[String], maxConnectionRate: Option[Int]): Unit = { +def isIpConnectionRateMetric(metricName: MetricName) = { + metricName.name == ConnectionRateMetricName && + metricName.group == MetricsGroup && + metricName.tags.containsKey(IpMetricTag) +} + +def shouldUpdateQuota(metric: KafkaMetric, quotaLimit: Int) = { + quotaLimit != metric.config.quota.bound +} +counts.synchronized { Review comment: I'm thinking on this a bit more, and I think that only locking on updates to `defaultConnectionRatePerIp` should be sufficient for correctness. ZK dynamic config changes are processed within one thread, so we will only have one thread executing in `updateIpConnectionRateQuota`. If we want to be really careful about this, we can have `IpConfigHandler` synchronize on `ConnectionQuotas`. The case we want to avoid by synchronizing on `counts` is that another thread `defaultConnectionRatePerIp` as connection rate limit `A` while calling `inc()`, then we update connection rate and quota metrics to `B`, then the other thread creates a sensor/metric with quota limit `A` => inconsistency. If we synchronize on `counts` for only updates to `connectionRateForIp/defaultConnectionRate`, we know that any thread that has read a connection rate quota as `A` will finish creating quota metrics with limit `A` before we acquire the `counts` lock. After acquiring the `counts` lock and updating `connectionRateForIp/defaultConnectionRate` to `B`, any subsequent threads calling `inc()` will read the quota as `B` and create a metric as `B`. We will then be able to update any quota metrics from `A` to `B`, knowing that all new ones will be created as `B`. The only issue remaining is that we can get concurrent reads of `connectionRatePerIp`, but we can just replace `mutable.Map` with `ConcurrentHashMap`. Let me know if I'm missing something here with respect to thread safety. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] splett2 commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits (KIP-612, part 2)
splett2 commented on a change in pull request #9386: URL: https://github.com/apache/kafka/pull/9386#discussion_r521856417 ## File path: core/src/main/scala/kafka/network/SocketServer.scala ## @@ -1324,7 +1401,59 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend private[network] def updateBrokerMaxConnectionRate(maxConnectionRate: Int): Unit = { // if there is a connection waiting on the rate throttle delay, we will let it wait the original delay even if // the rate limit increases, because it is just one connection per listener and the code is simpler that way -updateConnectionRateQuota(maxConnectionRate) +updateConnectionRateQuota(maxConnectionRate, BrokerQuotaEntity) + } + + /** + * Update the connection rate quota for a given IP and updates quota configs for updated IPs. + * If an IP is given, metric config will be updated only for the given IP, otherwise + * all metric configs will be checked and updated if required. + * + * @param ip ip to update or default if None + * @param maxConnectionRate new connection rate, or resets entity to default if None + */ + def updateIpConnectionRateQuota(ip: Option[String], maxConnectionRate: Option[Int]): Unit = { +def isIpConnectionRateMetric(metricName: MetricName) = { + metricName.name == ConnectionRateMetricName && + metricName.group == MetricsGroup && + metricName.tags.containsKey(IpMetricTag) +} + +def shouldUpdateQuota(metric: KafkaMetric, quotaLimit: Int) = { + quotaLimit != metric.config.quota.bound +} +counts.synchronized { Review comment: I'm thinking on this a bit more, and I think that only locking on updates to `defaultConnectionRatePerIp` should be sufficient for correctness. ZK dynamic config changes are processed within one thread, so we will only have one thread executing in `updateIpConnectionRateQuota`. If we want to be really careful about this, we can have `IpConfigHandler` synchronize on `ConnectionQuotas`. The case we want to avoid by synchronizing on `counts` is that another thread `defaultConnectionRatePerIp` as connection rate limit `A` while calling `inc()`, then we update connection rate and quota metrics to `B`, then the other thread creates a sensor/metric with quota limit `A` => inconsistency. If we synchronize on `counts` for only updates to `connectionRateForIp/defaultConnectionRate`, we know that any thread that has read a connection rate quota as `A` will finish creating quota metrics with limit `A` before we acquire the `counts` lock. After acquiring the `counts` lock and updating `connectionRateForIp/defaultConnectionRate` to `B`, any subsequent threads calling `inc()` will read the quota as `B` and create a metric as `B`. We will then be able to update any quota metrics from `A` to `B`, knowing that all new ones will be created as `B`. The only issue remaining is that we can get concurrent reads of `connectionRatePerIp`, but we can just replace `mutable.Map` with `ConcurrentHashMap`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] splett2 commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits (KIP-612, part 2)
splett2 commented on a change in pull request #9386: URL: https://github.com/apache/kafka/pull/9386#discussion_r521549126 ## File path: core/src/main/scala/kafka/network/SocketServer.scala ## @@ -538,14 +542,20 @@ private[kafka] class Acceptor(val endPoint: EndPoint, val recvBufferSize: Int, brokerId: Int, connectionQuotas: ConnectionQuotas, - metricPrefix: String) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup { + metricPrefix: String, + time: Time) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup { private val nioSelector = NSelector.open() val serverChannel = openServerSocket(endPoint.host, endPoint.port) private val processors = new ArrayBuffer[Processor]() private val processorsStarted = new AtomicBoolean private val blockedPercentMeter = newMeter(s"${metricPrefix}AcceptorBlockedPercent", "blocked time", TimeUnit.NANOSECONDS, Map(ListenerMetricTag -> endPoint.listenerName.value)) + private var currentProcessorIndex = 0 + private[network] case class DelayedCloseSocket(socket: SocketChannel, endThrottleTimeMs: Long) extends Ordered[DelayedCloseSocket] { +override def compare(that: DelayedCloseSocket): Int = endThrottleTimeMs compare that.endThrottleTimeMs + } + private[network] val throttledSockets = new mutable.PriorityQueue[DelayedCloseSocket]() Review comment: I explained elsewhere, but my comment on may have gotten lost in the weeds. for reference: I did not use the java DelayQueue similar to the implementation in ClientQuotaManager because this throttling implementation does not use timeout-based polling or require a synchronized data structure, and there's a bit more boilerplate needed for a DelayQueue to support timeout-based polling. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] splett2 commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits (KIP-612, part 2)
splett2 commented on a change in pull request #9386: URL: https://github.com/apache/kafka/pull/9386#discussion_r521545712 ## File path: core/src/main/scala/kafka/network/SocketServer.scala ## @@ -1324,7 +1401,59 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend private[network] def updateBrokerMaxConnectionRate(maxConnectionRate: Int): Unit = { // if there is a connection waiting on the rate throttle delay, we will let it wait the original delay even if // the rate limit increases, because it is just one connection per listener and the code is simpler that way -updateConnectionRateQuota(maxConnectionRate) +updateConnectionRateQuota(maxConnectionRate, BrokerQuotaEntity) + } + + /** + * Update the connection rate quota for a given IP and updates quota configs for updated IPs. + * If an IP is given, metric config will be updated only for the given IP, otherwise + * all metric configs will be checked and updated if required. + * + * @param ip ip to update or default if None + * @param maxConnectionRate new connection rate, or resets entity to default if None + */ + def updateIpConnectionRateQuota(ip: Option[String], maxConnectionRate: Option[Int]): Unit = { +def isIpConnectionRateMetric(metricName: MetricName) = { + metricName.name == ConnectionRateMetricName && + metricName.group == MetricsGroup && + metricName.tags.containsKey(IpMetricTag) +} + +def shouldUpdateQuota(metric: KafkaMetric, quotaLimit: Int) = { + quotaLimit != metric.config.quota.bound +} +counts.synchronized { Review comment: yeah, it's undesirable behavior. However, this is in line with `ClientQuotaManager.updateQuota`, which write locks when updating quota, and also has to iterate over all metrics for any default quota updates. I can't think of a good alternative, but I'm open to suggestions. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] splett2 commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits (KIP-612, part 2)
splett2 commented on a change in pull request #9386: URL: https://github.com/apache/kafka/pull/9386#discussion_r521614527 ## File path: core/src/main/scala/kafka/network/SocketServer.scala ## @@ -1324,7 +1401,59 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend private[network] def updateBrokerMaxConnectionRate(maxConnectionRate: Int): Unit = { // if there is a connection waiting on the rate throttle delay, we will let it wait the original delay even if // the rate limit increases, because it is just one connection per listener and the code is simpler that way -updateConnectionRateQuota(maxConnectionRate) +updateConnectionRateQuota(maxConnectionRate, BrokerQuotaEntity) + } + + /** + * Update the connection rate quota for a given IP and updates quota configs for updated IPs. + * If an IP is given, metric config will be updated only for the given IP, otherwise + * all metric configs will be checked and updated if required. + * + * @param ip ip to update or default if None + * @param maxConnectionRate new connection rate, or resets entity to default if None + */ + def updateIpConnectionRateQuota(ip: Option[String], maxConnectionRate: Option[Int]): Unit = { +def isIpConnectionRateMetric(metricName: MetricName) = { + metricName.name == ConnectionRateMetricName && + metricName.group == MetricsGroup && + metricName.tags.containsKey(IpMetricTag) +} + +def shouldUpdateQuota(metric: KafkaMetric, quotaLimit: Int) = { + quotaLimit != metric.config.quota.bound +} +counts.synchronized { + ip match { +case Some(addr) => + val address = InetAddress.getByName(addr) Review comment: Moved IP resolution to `IpConfigHandler`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] splett2 commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits (KIP-612, part 2)
splett2 commented on a change in pull request #9386: URL: https://github.com/apache/kafka/pull/9386#discussion_r521549126 ## File path: core/src/main/scala/kafka/network/SocketServer.scala ## @@ -538,14 +542,20 @@ private[kafka] class Acceptor(val endPoint: EndPoint, val recvBufferSize: Int, brokerId: Int, connectionQuotas: ConnectionQuotas, - metricPrefix: String) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup { + metricPrefix: String, + time: Time) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup { private val nioSelector = NSelector.open() val serverChannel = openServerSocket(endPoint.host, endPoint.port) private val processors = new ArrayBuffer[Processor]() private val processorsStarted = new AtomicBoolean private val blockedPercentMeter = newMeter(s"${metricPrefix}AcceptorBlockedPercent", "blocked time", TimeUnit.NANOSECONDS, Map(ListenerMetricTag -> endPoint.listenerName.value)) + private var currentProcessorIndex = 0 + private[network] case class DelayedCloseSocket(socket: SocketChannel, endThrottleTimeMs: Long) extends Ordered[DelayedCloseSocket] { +override def compare(that: DelayedCloseSocket): Int = endThrottleTimeMs compare that.endThrottleTimeMs + } + private[network] val throttledSockets = new mutable.PriorityQueue[DelayedCloseSocket]() Review comment: my comment on this may have gotten lost in the weeds. for reference: I did not use the java DelayQueue similar to the implementation in ClientQuotaManager because this throttling implementation does not use timeout-based polling or require a synchronized data structure, and there's a bit more boilerplate needed for a DelayQueue to support timeout-based polling. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] splett2 commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits (KIP-612, part 2)
splett2 commented on a change in pull request #9386: URL: https://github.com/apache/kafka/pull/9386#discussion_r521566585 ## File path: core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala ## @@ -633,18 +837,27 @@ class ConnectionQuotasTest { listenerName: ListenerName, address: InetAddress, numConnections: Long, -timeIntervalMs: Long) : Unit = { +timeIntervalMs: Long, +expectIpThrottle: Boolean): Boolean = { var nextSendTime = System.currentTimeMillis + timeIntervalMs +var ipThrottled = false for (_ <- 0L until numConnections) { // this method may block if broker-wide or listener limit on the number of connections is reached - connectionQuotas.inc(listenerName, address, blockedPercentMeters(listenerName.value)) - + try { +connectionQuotas.inc(listenerName, address, blockedPercentMeters(listenerName.value)) + } catch { +case e: ConnectionThrottledException => + if (!expectIpThrottle) +throw e + ipThrottled = true Review comment: a lot of the tests in `ConnectionQuotasTest` are using system time and are looking to reach a connection rate of N/s. so it may be the case that `ipThrottled` will remain true, but we want to verify that we can hit a target connection rate, even if connections in the middle of the interval get throttled. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] splett2 commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits (KIP-612, part 2)
splett2 commented on a change in pull request #9386: URL: https://github.com/apache/kafka/pull/9386#discussion_r521564771 ## File path: core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala ## @@ -477,6 +568,99 @@ class ConnectionQuotasTest { assertTrue("Expected BlockedPercentMeter metric for EXTERNAL listener to be recorded", blockedPercentMeters("EXTERNAL").count() > 0) } + @Test + def testIpConnectionRateUpdate(): Unit = { +val config = KafkaConfig.fromProps(brokerPropsWithDefaultConnectionLimits) +connectionQuotas = new ConnectionQuotas(config, Time.SYSTEM, metrics) +connectionQuotas.addListener(config, listeners("EXTERNAL").listenerName) +connectionQuotas.addListener(config, listeners("ADMIN").listenerName) +connectionQuotas.addListener(config, listeners("REPLICATION").listenerName) +val defaultIpRate = 50 +val defaultOverrideRate = 20 +val overrideIpRate = 30 +val externalListener = listeners("EXTERNAL") +val adminListener = listeners("ADMIN") +// set a non-unlimited default quota so that we create ip rate sensors/metrics +connectionQuotas.updateIpConnectionRateQuota(None, Some(defaultIpRate)) +connectionQuotas.inc(externalListener.listenerName, externalListener.defaultIp, blockedPercentMeters("EXTERNAL")) +connectionQuotas.inc(adminListener.listenerName, adminListener.defaultIp, blockedPercentMeters("ADMIN")) + +// both IPs should have the default rate +verifyIpConnectionQuota(externalListener.defaultIp, defaultIpRate) +verifyIpConnectionQuota(adminListener.defaultIp, defaultIpRate) + +// external listener should have its in-memory quota and metric config updated + connectionQuotas.updateIpConnectionRateQuota(Some(externalListener.defaultIp.getHostAddress), Some(overrideIpRate)) +verifyIpConnectionQuota(externalListener.defaultIp, overrideIpRate) + +// update default +connectionQuotas.updateIpConnectionRateQuota(None, Some(defaultOverrideRate)) + +// external listener IP should not have its quota updated to the new default +verifyIpConnectionQuota(externalListener.defaultIp, overrideIpRate) +// admin listener IP should have its quota updated with to the new default +verifyIpConnectionQuota(adminListener.defaultIp, defaultOverrideRate) + +// remove default connection rate quota +connectionQuotas.updateIpConnectionRateQuota(None, None) +verifyIpConnectionQuota(adminListener.defaultIp, DynamicConfig.Ip.DefaultConnectionCreationRate) + +// remove override for external listener IP + connectionQuotas.updateIpConnectionRateQuota(Some(externalListener.defaultIp.getHostAddress), None) +verifyIpConnectionQuota(externalListener.defaultIp, DynamicConfig.Ip.DefaultConnectionCreationRate) + } + + @Test + def testIpConnectionRateQuotaUpdate(): Unit = { Review comment: I renamed to `testIpConnectionRateMetricUpdate` and `testEnforcedIpConnectionRateQuotaUpdate` to clarify. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] splett2 commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits (KIP-612, part 2)
splett2 commented on a change in pull request #9386: URL: https://github.com/apache/kafka/pull/9386#discussion_r521549126 ## File path: core/src/main/scala/kafka/network/SocketServer.scala ## @@ -538,14 +542,20 @@ private[kafka] class Acceptor(val endPoint: EndPoint, val recvBufferSize: Int, brokerId: Int, connectionQuotas: ConnectionQuotas, - metricPrefix: String) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup { + metricPrefix: String, + time: Time) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup { private val nioSelector = NSelector.open() val serverChannel = openServerSocket(endPoint.host, endPoint.port) private val processors = new ArrayBuffer[Processor]() private val processorsStarted = new AtomicBoolean private val blockedPercentMeter = newMeter(s"${metricPrefix}AcceptorBlockedPercent", "blocked time", TimeUnit.NANOSECONDS, Map(ListenerMetricTag -> endPoint.listenerName.value)) + private var currentProcessorIndex = 0 + private[network] case class DelayedCloseSocket(socket: SocketChannel, endThrottleTimeMs: Long) extends Ordered[DelayedCloseSocket] { +override def compare(that: DelayedCloseSocket): Int = endThrottleTimeMs compare that.endThrottleTimeMs + } + private[network] val throttledSockets = new mutable.PriorityQueue[DelayedCloseSocket]() Review comment: my comment on this may have gotten lost in the weeds, for reference: I did not use the java DelayQueue similar to the implementation in ClientQuotaManager because this throttling implementation does not use timeout-based polling or require a synchronized data structure, and there's a bit more boilerplate needed for a DelayQueue to support timeout-based polling. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] splett2 commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits (KIP-612, part 2)
splett2 commented on a change in pull request #9386: URL: https://github.com/apache/kafka/pull/9386#discussion_r521545712 ## File path: core/src/main/scala/kafka/network/SocketServer.scala ## @@ -1324,7 +1401,59 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend private[network] def updateBrokerMaxConnectionRate(maxConnectionRate: Int): Unit = { // if there is a connection waiting on the rate throttle delay, we will let it wait the original delay even if // the rate limit increases, because it is just one connection per listener and the code is simpler that way -updateConnectionRateQuota(maxConnectionRate) +updateConnectionRateQuota(maxConnectionRate, BrokerQuotaEntity) + } + + /** + * Update the connection rate quota for a given IP and updates quota configs for updated IPs. + * If an IP is given, metric config will be updated only for the given IP, otherwise + * all metric configs will be checked and updated if required. + * + * @param ip ip to update or default if None + * @param maxConnectionRate new connection rate, or resets entity to default if None + */ + def updateIpConnectionRateQuota(ip: Option[String], maxConnectionRate: Option[Int]): Unit = { +def isIpConnectionRateMetric(metricName: MetricName) = { + metricName.name == ConnectionRateMetricName && + metricName.group == MetricsGroup && + metricName.tags.containsKey(IpMetricTag) +} + +def shouldUpdateQuota(metric: KafkaMetric, quotaLimit: Int) = { + quotaLimit != metric.config.quota.bound +} +counts.synchronized { Review comment: yeah, it's not desirable behavior. However, this is more or less in line with `ClientQuotaManager.updateQuota`, which write locks when updating quota, and also has to iterate over all metrics for any default quota updates. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] splett2 commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits (KIP-612, part 2)
splett2 commented on a change in pull request #9386: URL: https://github.com/apache/kafka/pull/9386#discussion_r519054824 ## File path: core/src/main/scala/kafka/network/SocketServer.scala ## @@ -1659,19 +1648,22 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend * @param connectionQuotaEntity entity to create the sensor for */ private def getOrCreateConnectionRateQuotaSensor(quotaLimit: Int, connectionQuotaEntity: ConnectionQuotaEntity): Sensor = { -sensorAccessor.getOrCreate( - connectionQuotaEntity.sensorName, - connectionQuotaEntity.sensorExpiration, - sensor => sensor.add(connectionRateMetricName(connectionQuotaEntity), new Rate, rateQuotaMetricConfig(quotaLimit)) -) +Option(metrics.getSensor(connectionQuotaEntity.sensorName)).getOrElse { + val sensor = metrics.sensor( +connectionQuotaEntity.sensorName, +rateQuotaMetricConfig(quotaLimit), +connectionQuotaEntity.sensorExpiration + ) + sensor.add(connectionRateMetricName(connectionQuotaEntity), new Rate, null) + sensor +} Review comment: I removed use of `sensorAccess`, since all calls to `sensorAccessor.getOrCreate` were performed with the `counts` lock, so the read-write lock is redundant. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] splett2 commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits (KIP-612, part 2)
splett2 commented on a change in pull request #9386: URL: https://github.com/apache/kafka/pull/9386#discussion_r519054824 ## File path: core/src/main/scala/kafka/network/SocketServer.scala ## @@ -1659,19 +1648,22 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend * @param connectionQuotaEntity entity to create the sensor for */ private def getOrCreateConnectionRateQuotaSensor(quotaLimit: Int, connectionQuotaEntity: ConnectionQuotaEntity): Sensor = { -sensorAccessor.getOrCreate( - connectionQuotaEntity.sensorName, - connectionQuotaEntity.sensorExpiration, - sensor => sensor.add(connectionRateMetricName(connectionQuotaEntity), new Rate, rateQuotaMetricConfig(quotaLimit)) -) +Option(metrics.getSensor(connectionQuotaEntity.sensorName)).getOrElse { + val sensor = metrics.sensor( +connectionQuotaEntity.sensorName, +rateQuotaMetricConfig(quotaLimit), +connectionQuotaEntity.sensorExpiration + ) + sensor.add(connectionRateMetricName(connectionQuotaEntity), new Rate, null) + sensor +} Review comment: I removed use of `sensorAccess`, since all calls to `sensorAccessor.getOrCreate` were performed with the `counts` lock. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] splett2 commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits (KIP-612, part 2)
splett2 commented on a change in pull request #9386: URL: https://github.com/apache/kafka/pull/9386#discussion_r516997443 ## File path: core/src/main/scala/kafka/network/SocketServer.scala ## @@ -697,6 +714,31 @@ private[kafka] class Acceptor(val endPoint: EndPoint, info(s"Rejected connection from ${e.ip}, address already has the configured maximum of ${e.count} connections.") close(endPoint.listenerName, socketChannel) None + case e: ConnectionThrottledException => +val ip = socketChannel.socket.getInetAddress +debug(s"Delaying closing of connection from $ip for ${e.throttleTimeMs} ms") +val delayQueue = throttledSockets.computeIfAbsent(ip, _ => new mutable.Queue[DelayedCloseSocket]) Review comment: Okay, I replaced the map/queue with a priority queue. I did not use the java `DelayQueue` similar to the implementation in `ClientQuotaManager` because this throttling implementation does not use timeout-based polling or require a synchronized data structure, and there's significantly more boilerplate needed for a `DelayQueue` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] splett2 commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits (KIP-612, part 2)
splett2 commented on a change in pull request #9386: URL: https://github.com/apache/kafka/pull/9386#discussion_r516995083 ## File path: core/src/main/scala/kafka/network/SocketServer.scala ## @@ -1207,14 +1286,26 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend private val listenerCounts = mutable.Map[ListenerName, Int]() private[network] val maxConnectionsPerListener = mutable.Map[ListenerName, ListenerConnectionQuota]() @volatile private var totalCount = 0 - + @volatile private var defaultConnectionRatePerIp = DynamicConfig.Ip.DefaultConnectionCreationRate + private val connectionRatePerIp = new ConcurrentHashMap[InetAddress, Int]() + private val lock = new ReentrantReadWriteLock() + private val sensorAccessor = new SensorAccess(lock, metrics) // sensor that tracks broker-wide connection creation rate and limit (quota) - private val brokerConnectionRateSensor = createConnectionRateQuotaSensor(config.maxConnectionCreationRate) + private val brokerConnectionRateSensor = getOrCreateConnectionRateQuotaSensor(config.maxConnectionCreationRate, BrokerQuotaEntity) private val maxThrottleTimeMs = TimeUnit.SECONDS.toMillis(config.quotaWindowSizeSeconds.toLong) def inc(listenerName: ListenerName, address: InetAddress, acceptorBlockedPercentMeter: com.yammer.metrics.core.Meter): Unit = { counts.synchronized { - waitForConnectionSlot(listenerName, acceptorBlockedPercentMeter) + val startThrottleTimeMs = time.milliseconds + + waitForConnectionSlot(listenerName, startThrottleTimeMs, acceptorBlockedPercentMeter) + + val ipThrottleTimeMs = recordIpConnectionMaybeThrottle(address, startThrottleTimeMs) Review comment: seems reasonable This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] splett2 commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits (KIP-612, part 2)
splett2 commented on a change in pull request #9386: URL: https://github.com/apache/kafka/pull/9386#discussion_r516301423 ## File path: core/src/test/scala/integration/kafka/network/DynamicConnectionQuotaTest.scala ## @@ -240,6 +256,16 @@ class DynamicConnectionQuotaTest extends BaseRequestTest { s"Admin client connection not closed (initial = $initialConnectionCount, current = $connectionCount)") } + private def updateIpConnectionRate(ip: Option[String], updatedRate: Int): Unit = { +adminZkClient.changeIpConfig(ip.getOrElse(ConfigEntityName.Default), + CoreUtils.propsWith(DynamicConfig.Ip.IpConnectionRateOverrideProp, updatedRate.toString)) +// use a random throwaway address if ip isn't specified to get the default value +TestUtils.waitUntilTrue(() => servers.head.socketServer.connectionQuotas. + connectionRateForIp(InetAddress.getByName(ip.getOrElse("255.255.3.4"))) == updatedRate, Review comment: This is admittedly a little weird. If `None` is given as the IP, we want to update the default connection rate. To verify that the default connection rate was updated, we need to call `connectionRateForIp` with some arbitrary IP address that hasn't been given a specific override. In this case, I used an arbitrary IP, `255.255.3.4` as mentioned in the comment. ## File path: core/src/main/scala/kafka/network/SocketServer.scala ## @@ -1246,7 +1337,57 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend private[network] def updateBrokerMaxConnectionRate(maxConnectionRate: Int): Unit = { // if there is a connection waiting on the rate throttle delay, we will let it wait the original delay even if // the rate limit increases, because it is just one connection per listener and the code is simpler that way -updateConnectionRateQuota(maxConnectionRate) +updateConnectionRateQuota(maxConnectionRate, BrokerQuotaEntity) + } + + /** + * Update the connection rate quota for a given IP and updates quota configs for updated IPs. + * If an IP is given, metric config will be updated only for the given IP, otherwise + * all metric configs will be checked and updated if required + * + * @param ip ip to update or default if None + * @param maxConnectionRate new connection rate, or resets entity to default if None + */ + def updateIpConnectionRate(ip: Option[String], maxConnectionRate: Option[Int]): Unit = { +def isIpConnectionRateMetric(metricName: MetricName) = { + metricName.name == ConnectionRateMetricName && + metricName.group == MetricsGroup && + metricName.tags.containsKey(IpMetricTag) +} + +def shouldUpdateQuota(metric: KafkaMetric, quotaLimit: Int) = { + quotaLimit != metric.config.quota.bound +} + +ip match { + case Some(addr) => +val address = InetAddress.getByName(addr) +maxConnectionRate match { + case Some(rate) => +info(s"Updating max connection rate override for $address to $rate") +connectionRatePerIp.put(address, rate) + case None => +info(s"Removing max connection rate override for $address") +connectionRatePerIp.remove(address) +} +updateConnectionRateQuota(connectionRateForIp(address), IpQuotaEntity(address)) + case None => +val newQuota = maxConnectionRate.getOrElse(DynamicConfig.Ip.DefaultConnectionCreationRate) +info(s"Updating default max IP connection rate to $newQuota") +defaultConnectionRatePerIp = newQuota +val allMetrics = metrics.metrics +allMetrics.forEach { (metricName, metric) => + if (isIpConnectionRateMetric(metricName) && shouldUpdateQuota(metric, newQuota)) { +info(s"Updating existing connection rate sensor for ${metricName.tags} to $newQuota") +metric.config(rateQuotaMetricConfig(newQuota)) + } Review comment: @dajac good catch, we shouldn't be using newQuota here. I agree, this should have been covered by testing, I'll work on adding some more unit tests for the metric config updating. @apovzner It should handle that case, yeah. If default is set to some value, when we remove quota for an ip, e.g. `updateConnectionRate(Some(ip), None)`, we remove the connection rate entry from the map and then call `getOrDefault(ip, defaultConnectionRatePerIp)`which should be whatever the non-unlimited per-IP default quota is. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org