[GitHub] [kafka] splett2 commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits (KIP-612, part 2)

2020-11-18 Thread GitBox


splett2 commented on a change in pull request #9386:
URL: https://github.com/apache/kafka/pull/9386#discussion_r526525212



##
File path: core/src/main/scala/kafka/network/SocketServer.scala
##
@@ -1476,33 +1650,36 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, 
metrics: Metrics) extend
* Creates sensor for tracking the connection creation rate and 
corresponding connection rate quota for a given
* listener or broker-wide, if listener is not provided.
* @param quotaLimit connection creation rate quota
-   * @param listenerOpt listener name if sensor is for a listener
+   * @param connectionQuotaEntity entity to create the sensor for
*/
-  private def createConnectionRateQuotaSensor(quotaLimit: Int, listenerOpt: 
Option[String] = None): Sensor = {
-val sensorName = listenerOpt.map(listener => 
s"ConnectionAcceptRate-$listener").getOrElse("ConnectionAcceptRate")
-val sensor = metrics.sensor(sensorName, rateQuotaMetricConfig(quotaLimit))
-sensor.add(connectionRateMetricName(listenerOpt), new Rate, null)
-info(s"Created $sensorName sensor, quotaLimit=$quotaLimit")
-sensor
+  private def getOrCreateConnectionRateQuotaSensor(quotaLimit: Int, 
connectionQuotaEntity: ConnectionQuotaEntity): Sensor = {
+Option(metrics.getSensor(connectionQuotaEntity.sensorName)).getOrElse {

Review comment:
   I will add this detail to `recordIpConnectionMaybeThrottle`. I think 
that is a more fitting place to put the comment, since that's the section of 
code that calls `connectionRateForIp` and 
`getOrCreateConnectionRateQuotaSensor` which are the components we need to be 
atomic.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] splett2 commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits (KIP-612, part 2)

2020-11-18 Thread GitBox


splett2 commented on a change in pull request #9386:
URL: https://github.com/apache/kafka/pull/9386#discussion_r526528689



##
File path: core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala
##
@@ -633,18 +837,27 @@ class ConnectionQuotasTest {
 listenerName: ListenerName,
 address: InetAddress,
 numConnections: Long,
-timeIntervalMs: Long) : Unit = {
+timeIntervalMs: Long,
+expectIpThrottle: Boolean): Boolean = {
 var nextSendTime = System.currentTimeMillis + timeIntervalMs
+var ipThrottled = false
 for (_ <- 0L until numConnections) {
   // this method may block if broker-wide or listener limit on the number 
of connections is reached
-  connectionQuotas.inc(listenerName, address, 
blockedPercentMeters(listenerName.value))
-
+  try {
+connectionQuotas.inc(listenerName, address, 
blockedPercentMeters(listenerName.value))
+  } catch {
+case e: ConnectionThrottledException =>
+  if (!expectIpThrottle)
+throw e
+  ipThrottled = true

Review comment:
   Thanks. I filed https://issues.apache.org/jira/browse/KAFKA-10744 for 
the clean up/conversion.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] splett2 commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits (KIP-612, part 2)

2020-11-18 Thread GitBox


splett2 commented on a change in pull request #9386:
URL: https://github.com/apache/kafka/pull/9386#discussion_r526527000



##
File path: core/src/main/scala/kafka/network/SocketServer.scala
##
@@ -1324,7 +1404,60 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, 
metrics: Metrics) extend
   private[network] def updateBrokerMaxConnectionRate(maxConnectionRate: Int): 
Unit = {
 // if there is a connection waiting on the rate throttle delay, we will 
let it wait the original delay even if
 // the rate limit increases, because it is just one connection per 
listener and the code is simpler that way
-updateConnectionRateQuota(maxConnectionRate)
+updateConnectionRateQuota(maxConnectionRate, BrokerQuotaEntity)
+  }
+
+  /**
+   * Update the connection rate quota for a given IP and updates quota configs 
for updated IPs.
+   * If an IP is given, metric config will be updated only for the given IP, 
otherwise
+   * all metric configs will be checked and updated if required.
+   *
+   * @param ip ip to update or default if None
+   * @param maxConnectionRate new connection rate, or resets entity to default 
if None
+   */
+  def updateIpConnectionRateQuota(ip: Option[InetAddress], maxConnectionRate: 
Option[Int]): Unit = synchronized {
+def isIpConnectionRateMetric(metricName: MetricName) = {
+  metricName.name == ConnectionRateMetricName &&
+  metricName.group == MetricsGroup &&
+  metricName.tags.containsKey(IpMetricTag)
+}
+
+def shouldUpdateQuota(metric: KafkaMetric, quotaLimit: Int) = {
+  quotaLimit != metric.config.quota.bound
+}
+ip match {
+  case Some(address) =>
+counts.synchronized {
+  maxConnectionRate match {
+case Some(rate) =>
+  info(s"Updating max connection rate override for $address to 
$rate")
+  connectionRatePerIp.put(address, rate)
+case None =>
+  info(s"Removing max connection rate override for $address")
+  connectionRatePerIp.remove(address)
+  }
+}
+updateConnectionRateQuota(connectionRateForIp(address), 
IpQuotaEntity(address))
+  case None =>
+counts.synchronized {
+  defaultConnectionRatePerIp = 
maxConnectionRate.getOrElse(DynamicConfig.Ip.DefaultConnectionCreationRate)
+}
+info(s"Updated default max IP connection rate to 
$defaultConnectionRatePerIp")
+metrics.metrics.forEach { (metricName, metric) =>
+  if (isIpConnectionRateMetric(metricName)) {
+val quota = 
connectionRateForIp(InetAddress.getByName(metricName.tags.get(IpMetricTag)))

Review comment:
   reasons for using `InetAddress` would be that it's more consistent with 
the other IP-related data structures in `ConnectionQuotas`, 
`maxConnectionsPerIpOverrides` and `counts`.
   
   additionally, `inc()` is called with an `InetAddress`, so it's convenient to 
keep our data structure using `InetAddress` as a key so that we don't need to 
convert `InetAddress` => `String` in cases where we would not create a sensor 
(e.g., when IP quotas are disabled).





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] splett2 commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits (KIP-612, part 2)

2020-11-18 Thread GitBox


splett2 commented on a change in pull request #9386:
URL: https://github.com/apache/kafka/pull/9386#discussion_r526527000



##
File path: core/src/main/scala/kafka/network/SocketServer.scala
##
@@ -1324,7 +1404,60 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, 
metrics: Metrics) extend
   private[network] def updateBrokerMaxConnectionRate(maxConnectionRate: Int): 
Unit = {
 // if there is a connection waiting on the rate throttle delay, we will 
let it wait the original delay even if
 // the rate limit increases, because it is just one connection per 
listener and the code is simpler that way
-updateConnectionRateQuota(maxConnectionRate)
+updateConnectionRateQuota(maxConnectionRate, BrokerQuotaEntity)
+  }
+
+  /**
+   * Update the connection rate quota for a given IP and updates quota configs 
for updated IPs.
+   * If an IP is given, metric config will be updated only for the given IP, 
otherwise
+   * all metric configs will be checked and updated if required.
+   *
+   * @param ip ip to update or default if None
+   * @param maxConnectionRate new connection rate, or resets entity to default 
if None
+   */
+  def updateIpConnectionRateQuota(ip: Option[InetAddress], maxConnectionRate: 
Option[Int]): Unit = synchronized {
+def isIpConnectionRateMetric(metricName: MetricName) = {
+  metricName.name == ConnectionRateMetricName &&
+  metricName.group == MetricsGroup &&
+  metricName.tags.containsKey(IpMetricTag)
+}
+
+def shouldUpdateQuota(metric: KafkaMetric, quotaLimit: Int) = {
+  quotaLimit != metric.config.quota.bound
+}
+ip match {
+  case Some(address) =>
+counts.synchronized {
+  maxConnectionRate match {
+case Some(rate) =>
+  info(s"Updating max connection rate override for $address to 
$rate")
+  connectionRatePerIp.put(address, rate)
+case None =>
+  info(s"Removing max connection rate override for $address")
+  connectionRatePerIp.remove(address)
+  }
+}
+updateConnectionRateQuota(connectionRateForIp(address), 
IpQuotaEntity(address))
+  case None =>
+counts.synchronized {
+  defaultConnectionRatePerIp = 
maxConnectionRate.getOrElse(DynamicConfig.Ip.DefaultConnectionCreationRate)
+}
+info(s"Updated default max IP connection rate to 
$defaultConnectionRatePerIp")
+metrics.metrics.forEach { (metricName, metric) =>
+  if (isIpConnectionRateMetric(metricName)) {
+val quota = 
connectionRateForIp(InetAddress.getByName(metricName.tags.get(IpMetricTag)))

Review comment:
   reasons for using `InetAddress` would be that it's more consistent with 
the other IP-related data structures in `ConnectionQuotas`, 
`maxConnectionsPerIpOverrides` and `counts`.
   
   additionally, `inc()` is called with an `InetAddress`, so it's convenient to 
keep our data structure using `InetAddress` as a key so that we don't need to 
convert `InetAddress` => `String` in cases where we would skip creating a 
sensor (e.g., when IP quotas are disabled).





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] splett2 commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits (KIP-612, part 2)

2020-11-18 Thread GitBox


splett2 commented on a change in pull request #9386:
URL: https://github.com/apache/kafka/pull/9386#discussion_r526527000



##
File path: core/src/main/scala/kafka/network/SocketServer.scala
##
@@ -1324,7 +1404,60 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, 
metrics: Metrics) extend
   private[network] def updateBrokerMaxConnectionRate(maxConnectionRate: Int): 
Unit = {
 // if there is a connection waiting on the rate throttle delay, we will 
let it wait the original delay even if
 // the rate limit increases, because it is just one connection per 
listener and the code is simpler that way
-updateConnectionRateQuota(maxConnectionRate)
+updateConnectionRateQuota(maxConnectionRate, BrokerQuotaEntity)
+  }
+
+  /**
+   * Update the connection rate quota for a given IP and updates quota configs 
for updated IPs.
+   * If an IP is given, metric config will be updated only for the given IP, 
otherwise
+   * all metric configs will be checked and updated if required.
+   *
+   * @param ip ip to update or default if None
+   * @param maxConnectionRate new connection rate, or resets entity to default 
if None
+   */
+  def updateIpConnectionRateQuota(ip: Option[InetAddress], maxConnectionRate: 
Option[Int]): Unit = synchronized {
+def isIpConnectionRateMetric(metricName: MetricName) = {
+  metricName.name == ConnectionRateMetricName &&
+  metricName.group == MetricsGroup &&
+  metricName.tags.containsKey(IpMetricTag)
+}
+
+def shouldUpdateQuota(metric: KafkaMetric, quotaLimit: Int) = {
+  quotaLimit != metric.config.quota.bound
+}
+ip match {
+  case Some(address) =>
+counts.synchronized {
+  maxConnectionRate match {
+case Some(rate) =>
+  info(s"Updating max connection rate override for $address to 
$rate")
+  connectionRatePerIp.put(address, rate)
+case None =>
+  info(s"Removing max connection rate override for $address")
+  connectionRatePerIp.remove(address)
+  }
+}
+updateConnectionRateQuota(connectionRateForIp(address), 
IpQuotaEntity(address))
+  case None =>
+counts.synchronized {
+  defaultConnectionRatePerIp = 
maxConnectionRate.getOrElse(DynamicConfig.Ip.DefaultConnectionCreationRate)
+}
+info(s"Updated default max IP connection rate to 
$defaultConnectionRatePerIp")
+metrics.metrics.forEach { (metricName, metric) =>
+  if (isIpConnectionRateMetric(metricName)) {
+val quota = 
connectionRateForIp(InetAddress.getByName(metricName.tags.get(IpMetricTag)))

Review comment:
   reasons for using `InetAddress` would be that it's more consistent with 
the other IP-related data structures in `ConnectionQuotas`, e.g.
   `maxConnectionsPerIpOverrides` and `counts`.
   
   additionally, `inc()` is called with an `InetAddress`, so it's convenient to 
keep our data structure using `InetAddress` as a key so that we don't need to 
convert `InetAddress` => `String` in cases where we would skip creating a 
sensor (e.g., when IP quotas are disabled).





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] splett2 commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits (KIP-612, part 2)

2020-11-18 Thread GitBox


splett2 commented on a change in pull request #9386:
URL: https://github.com/apache/kafka/pull/9386#discussion_r526525212



##
File path: core/src/main/scala/kafka/network/SocketServer.scala
##
@@ -1476,33 +1650,36 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, 
metrics: Metrics) extend
* Creates sensor for tracking the connection creation rate and 
corresponding connection rate quota for a given
* listener or broker-wide, if listener is not provided.
* @param quotaLimit connection creation rate quota
-   * @param listenerOpt listener name if sensor is for a listener
+   * @param connectionQuotaEntity entity to create the sensor for
*/
-  private def createConnectionRateQuotaSensor(quotaLimit: Int, listenerOpt: 
Option[String] = None): Sensor = {
-val sensorName = listenerOpt.map(listener => 
s"ConnectionAcceptRate-$listener").getOrElse("ConnectionAcceptRate")
-val sensor = metrics.sensor(sensorName, rateQuotaMetricConfig(quotaLimit))
-sensor.add(connectionRateMetricName(listenerOpt), new Rate, null)
-info(s"Created $sensorName sensor, quotaLimit=$quotaLimit")
-sensor
+  private def getOrCreateConnectionRateQuotaSensor(quotaLimit: Int, 
connectionQuotaEntity: ConnectionQuotaEntity): Sensor = {
+Option(metrics.getSensor(connectionQuotaEntity.sensorName)).getOrElse {

Review comment:
   I will add this detail to `recordIpConnectionMaybeThrottle`. I think 
that is a more fitting place to put the comment, since that's the section of 
code that calls `connectionRateForIp`, which is what we need to be threadsafe.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] splett2 commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits (KIP-612, part 2)

2020-11-16 Thread GitBox


splett2 commented on a change in pull request #9386:
URL: https://github.com/apache/kafka/pull/9386#discussion_r521856417



##
File path: core/src/main/scala/kafka/network/SocketServer.scala
##
@@ -1324,7 +1401,59 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, 
metrics: Metrics) extend
   private[network] def updateBrokerMaxConnectionRate(maxConnectionRate: Int): 
Unit = {
 // if there is a connection waiting on the rate throttle delay, we will 
let it wait the original delay even if
 // the rate limit increases, because it is just one connection per 
listener and the code is simpler that way
-updateConnectionRateQuota(maxConnectionRate)
+updateConnectionRateQuota(maxConnectionRate, BrokerQuotaEntity)
+  }
+
+  /**
+   * Update the connection rate quota for a given IP and updates quota configs 
for updated IPs.
+   * If an IP is given, metric config will be updated only for the given IP, 
otherwise
+   * all metric configs will be checked and updated if required.
+   *
+   * @param ip ip to update or default if None
+   * @param maxConnectionRate new connection rate, or resets entity to default 
if None
+   */
+  def updateIpConnectionRateQuota(ip: Option[String], maxConnectionRate: 
Option[Int]): Unit = {
+def isIpConnectionRateMetric(metricName: MetricName) = {
+  metricName.name == ConnectionRateMetricName &&
+  metricName.group == MetricsGroup &&
+  metricName.tags.containsKey(IpMetricTag)
+}
+
+def shouldUpdateQuota(metric: KafkaMetric, quotaLimit: Int) = {
+  quotaLimit != metric.config.quota.bound
+}
+counts.synchronized {

Review comment:
   After thinking on this a bit more, I think that only locking on updates 
to `defaultConnectionRatePerIp` should be sufficient for correctness.
   
   ZK dynamic config changes are processed within one thread, so we will only 
have one thread executing in `updateIpConnectionRateQuota`. If we want to be 
really careful about this, we can have `updateIpConnectionRateQuota` 
synchronized on `ConnectionQuotas`.
   
   The case we want to avoid by synchronizing on `counts` is that thread 1 
reads `defaultConnectionRatePerIp` as connection rate limit `A` while calling 
`inc()`, then thread 2 updates connection rate and quota metric config to `B`, 
then thread 1 resumes execution and creates a sensor/metric with quota limit 
`A` => inconsistency.
   
   If we synchronize on `counts` for only updates to 
`connectionRateForIp/defaultConnectionRate`, we know that thread 1 that has 
read a connection rate quota as `A` will finish creating quota metrics with 
limit `A` before thread 2 acquires the `counts` lock and updates 
`connectionRateForIp/defaultConnectionRate` to `B`. 
   
   After thread 2 releases the `counts` lock, subsequent threads calling 
`inc()` will read the quota as `B` and create a metric as `B`. Thread 2 can 
then be able to update any quota metrics from `A` to `B`, without holding the 
`counts` lock knowing that there are no operations that could have read the 
default connection rate limit as `A` without already having finished created 
the sensor with quota as `A`, and that all subsequent quotas will be read and 
created as `B`.
   
   The only issue remaining is that we can get concurrent reads of 
`connectionRatePerIp` while updating quota metrics, but we can just replace 
`mutable.Map` with `ConcurrentHashMap` which is preferable to coarsely locking 
on `counts`.
   
   Let me know if I'm missing something here with respect to thread safety.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] splett2 commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits (KIP-612, part 2)

2020-11-16 Thread GitBox


splett2 commented on a change in pull request #9386:
URL: https://github.com/apache/kafka/pull/9386#discussion_r521856417



##
File path: core/src/main/scala/kafka/network/SocketServer.scala
##
@@ -1324,7 +1401,59 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, 
metrics: Metrics) extend
   private[network] def updateBrokerMaxConnectionRate(maxConnectionRate: Int): 
Unit = {
 // if there is a connection waiting on the rate throttle delay, we will 
let it wait the original delay even if
 // the rate limit increases, because it is just one connection per 
listener and the code is simpler that way
-updateConnectionRateQuota(maxConnectionRate)
+updateConnectionRateQuota(maxConnectionRate, BrokerQuotaEntity)
+  }
+
+  /**
+   * Update the connection rate quota for a given IP and updates quota configs 
for updated IPs.
+   * If an IP is given, metric config will be updated only for the given IP, 
otherwise
+   * all metric configs will be checked and updated if required.
+   *
+   * @param ip ip to update or default if None
+   * @param maxConnectionRate new connection rate, or resets entity to default 
if None
+   */
+  def updateIpConnectionRateQuota(ip: Option[String], maxConnectionRate: 
Option[Int]): Unit = {
+def isIpConnectionRateMetric(metricName: MetricName) = {
+  metricName.name == ConnectionRateMetricName &&
+  metricName.group == MetricsGroup &&
+  metricName.tags.containsKey(IpMetricTag)
+}
+
+def shouldUpdateQuota(metric: KafkaMetric, quotaLimit: Int) = {
+  quotaLimit != metric.config.quota.bound
+}
+counts.synchronized {

Review comment:
   After thinking on this a bit more, I think that only locking on updates 
to `defaultConnectionRatePerIp` should be sufficient for correctness.
   
   ZK dynamic config changes are processed within one thread, so we will only 
have one thread executing in `updateIpConnectionRateQuota`. If we want to be 
really careful about this, we can have `updateIpConnectionRateQuota` 
synchronized on `ConnectionQuotas`.
   
   The case we want to avoid by synchronizing on `counts` is that thread 1 
reads `defaultConnectionRatePerIp` as connection rate limit `A` while calling 
`inc()`, then thread 2 updates connection rate and quota metric config to `B`, 
then thread 1 resumes execution and creates a sensor/metric with quota limit 
`A` => inconsistency.
   
   If we synchronize on `counts` for only updates to 
`connectionRateForIp/defaultConnectionRate`, we know that thread 1 that has 
read a connection rate quota as `A` will finish creating quota metrics with 
quota metric config `A` before thread 2 acquires the `counts` lock and updates 
`connectionRateForIp/defaultConnectionRate` to `B`. 
   
   After thread 2 releases the `counts` lock, subsequent threads calling 
`inc()` will read the quota as `B` and create a metric as `B`. Thread 2 can 
then be able to update any quota metrics from `A` to `B`, without holding the 
`counts` lock knowing that there are no operations that could have read the 
default connection rate limit as `A` without already having finished created 
the sensor with quota as `A`, and that all subsequent quotas will be read and 
created as `B`.
   
   The only issue remaining is that we can get concurrent reads of 
`connectionRatePerIp` while updating quota metrics, but we can just replace 
`mutable.Map` with `ConcurrentHashMap` which is preferable to coarsely locking 
on `counts`.
   
   Let me know if I'm missing something here with respect to thread safety.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] splett2 commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits (KIP-612, part 2)

2020-11-14 Thread GitBox


splett2 commented on a change in pull request #9386:
URL: https://github.com/apache/kafka/pull/9386#discussion_r523682718



##
File path: core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala
##
@@ -633,18 +837,27 @@ class ConnectionQuotasTest {
 listenerName: ListenerName,
 address: InetAddress,
 numConnections: Long,
-timeIntervalMs: Long) : Unit = {
+timeIntervalMs: Long,
+expectIpThrottle: Boolean): Boolean = {
 var nextSendTime = System.currentTimeMillis + timeIntervalMs
+var ipThrottled = false
 for (_ <- 0L until numConnections) {
   // this method may block if broker-wide or listener limit on the number 
of connections is reached
-  connectionQuotas.inc(listenerName, address, 
blockedPercentMeters(listenerName.value))
-
+  try {
+connectionQuotas.inc(listenerName, address, 
blockedPercentMeters(listenerName.value))
+  } catch {
+case e: ConnectionThrottledException =>
+  if (!expectIpThrottle)
+throw e
+  ipThrottled = true

Review comment:
   I took a closer look at some of the IP throttling tests, and I think 
they are a little too coarse-grained with respect to testing the quota rate 
being enforced. I updated the IP throttling rate above/below tests that don't 
expect to block to use `acceptConnectionsAndVerifyRate` with `MockTime` to 
verify that the expected quota rate holds while getting throttled.
   
   It's a little messy, because a lot of the other tests for broker/listener 
quotas have to use `Time.SYSTEM` due to monitor waiting. I think it should also 
be possible to rewrite the broker/listener connection rate quota tests to use 
mock time by extending the `Time` interface, but I would prefer to do that in a 
follow-up PR.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] splett2 commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits (KIP-612, part 2)

2020-11-14 Thread GitBox


splett2 commented on a change in pull request #9386:
URL: https://github.com/apache/kafka/pull/9386#discussion_r523682718



##
File path: core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala
##
@@ -633,18 +837,27 @@ class ConnectionQuotasTest {
 listenerName: ListenerName,
 address: InetAddress,
 numConnections: Long,
-timeIntervalMs: Long) : Unit = {
+timeIntervalMs: Long,
+expectIpThrottle: Boolean): Boolean = {
 var nextSendTime = System.currentTimeMillis + timeIntervalMs
+var ipThrottled = false
 for (_ <- 0L until numConnections) {
   // this method may block if broker-wide or listener limit on the number 
of connections is reached
-  connectionQuotas.inc(listenerName, address, 
blockedPercentMeters(listenerName.value))
-
+  try {
+connectionQuotas.inc(listenerName, address, 
blockedPercentMeters(listenerName.value))
+  } catch {
+case e: ConnectionThrottledException =>
+  if (!expectIpThrottle)
+throw e
+  ipThrottled = true

Review comment:
   I took a closer look at some of the IP throttling tests, and I think 
they are a little too coarse-grained with respect to testing the quota rate 
being enforced. I updated the IP throttling rate above/below tests that don't 
expect to block to instead use `acceptConnectionsAndVerifyRate` with `MockTime`.
   
   It's a little messy, because a lot of the other tests for broker/listener 
quotas have to use `Time.SYSTEM` due to monitor waiting. I think it should also 
be possible to rewrite the broker/listener connection rate quota tests to use 
mock time by extending the `Time` interface, but I would prefer to do that in a 
follow-up PR.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] splett2 commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits (KIP-612, part 2)

2020-11-14 Thread GitBox


splett2 commented on a change in pull request #9386:
URL: https://github.com/apache/kafka/pull/9386#discussion_r523682718



##
File path: core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala
##
@@ -633,18 +837,27 @@ class ConnectionQuotasTest {
 listenerName: ListenerName,
 address: InetAddress,
 numConnections: Long,
-timeIntervalMs: Long) : Unit = {
+timeIntervalMs: Long,
+expectIpThrottle: Boolean): Boolean = {
 var nextSendTime = System.currentTimeMillis + timeIntervalMs
+var ipThrottled = false
 for (_ <- 0L until numConnections) {
   // this method may block if broker-wide or listener limit on the number 
of connections is reached
-  connectionQuotas.inc(listenerName, address, 
blockedPercentMeters(listenerName.value))
-
+  try {
+connectionQuotas.inc(listenerName, address, 
blockedPercentMeters(listenerName.value))
+  } catch {
+case e: ConnectionThrottledException =>
+  if (!expectIpThrottle)
+throw e
+  ipThrottled = true

Review comment:
   I took a closer look at some of the IP throttling tests, and I think 
they are a little too coarse-grained with respect to testing the quota rate 
being enforced. I updated the IP throttling rate above/below tests that don't 
expect to block to instead use `acceptConnectionsAndVerifyRate` with `MockTime`.
   
   It's a little messy, because a lot of the other tests for broker/listener 
quotas have to use `Time.SYSTEM` due to monitor waiting. I think it should also 
be possible to update the broker/listener quota tests in `ConnectionQuotasTest` 
and `DynamicConnectionQuotaTest` to use mock time by extending the `Time` 
interface, but I would prefer to do that in a follow-up PR.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] splett2 commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits (KIP-612, part 2)

2020-11-14 Thread GitBox


splett2 commented on a change in pull request #9386:
URL: https://github.com/apache/kafka/pull/9386#discussion_r523682718



##
File path: core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala
##
@@ -633,18 +837,27 @@ class ConnectionQuotasTest {
 listenerName: ListenerName,
 address: InetAddress,
 numConnections: Long,
-timeIntervalMs: Long) : Unit = {
+timeIntervalMs: Long,
+expectIpThrottle: Boolean): Boolean = {
 var nextSendTime = System.currentTimeMillis + timeIntervalMs
+var ipThrottled = false
 for (_ <- 0L until numConnections) {
   // this method may block if broker-wide or listener limit on the number 
of connections is reached
-  connectionQuotas.inc(listenerName, address, 
blockedPercentMeters(listenerName.value))
-
+  try {
+connectionQuotas.inc(listenerName, address, 
blockedPercentMeters(listenerName.value))
+  } catch {
+case e: ConnectionThrottledException =>
+  if (!expectIpThrottle)
+throw e
+  ipThrottled = true

Review comment:
   I took a closer look at some of the IP throttling tests, and I think 
they are a little too coarse-grained with respect to testing the quota rate 
being enforced. I updated the IP throttling tests that don't expect to block to 
instead use `acceptConnectionsAndVerifyRate` with `MockTime`.
   
   It's a little messy, because a lot of the other tests for broker/listener 
quotas have to use `Time.SYSTEM` due to monitor waiting. I think it should also 
be possible to update the broker/listener quota tests in `ConnectionQuotasTest` 
and `DynamicConnectionQuotaTest` to use mock time by extending the `Time` 
interface, but I would prefer to do that in a follow-up PR.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] splett2 commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits (KIP-612, part 2)

2020-11-14 Thread GitBox


splett2 commented on a change in pull request #9386:
URL: https://github.com/apache/kafka/pull/9386#discussion_r523682718



##
File path: core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala
##
@@ -633,18 +837,27 @@ class ConnectionQuotasTest {
 listenerName: ListenerName,
 address: InetAddress,
 numConnections: Long,
-timeIntervalMs: Long) : Unit = {
+timeIntervalMs: Long,
+expectIpThrottle: Boolean): Boolean = {
 var nextSendTime = System.currentTimeMillis + timeIntervalMs
+var ipThrottled = false
 for (_ <- 0L until numConnections) {
   // this method may block if broker-wide or listener limit on the number 
of connections is reached
-  connectionQuotas.inc(listenerName, address, 
blockedPercentMeters(listenerName.value))
-
+  try {
+connectionQuotas.inc(listenerName, address, 
blockedPercentMeters(listenerName.value))
+  } catch {
+case e: ConnectionThrottledException =>
+  if (!expectIpThrottle)
+throw e
+  ipThrottled = true

Review comment:
   I took a closer look at some of the IP throttling tests, and I think 
they are a little too coarse-grained with respect to testing the throttle rate 
being enforced. I updated the IP throttling tests that don't expect to block to 
instead use `acceptConnectionsAndVerifyRate` with `MockTime`.
   
   I think it should also be possible to update the broker/listener quota tests 
in `ConnectionQuotasTest` and `DynamicConnectionQuotaTest` to use mock time by 
extending the `Time` interface, but I would prefer to do that in a follow-up PR.

##
File path: core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala
##
@@ -633,18 +837,27 @@ class ConnectionQuotasTest {
 listenerName: ListenerName,
 address: InetAddress,
 numConnections: Long,
-timeIntervalMs: Long) : Unit = {
+timeIntervalMs: Long,
+expectIpThrottle: Boolean): Boolean = {
 var nextSendTime = System.currentTimeMillis + timeIntervalMs
+var ipThrottled = false
 for (_ <- 0L until numConnections) {
   // this method may block if broker-wide or listener limit on the number 
of connections is reached
-  connectionQuotas.inc(listenerName, address, 
blockedPercentMeters(listenerName.value))
-
+  try {
+connectionQuotas.inc(listenerName, address, 
blockedPercentMeters(listenerName.value))
+  } catch {
+case e: ConnectionThrottledException =>
+  if (!expectIpThrottle)
+throw e
+  ipThrottled = true

Review comment:
   I took a closer look at some of the IP throttling tests, and I think 
they are a little too coarse-grained with respect to testing the quota rate 
being enforced. I updated the IP throttling tests that don't expect to block to 
instead use `acceptConnectionsAndVerifyRate` with `MockTime`.
   
   I think it should also be possible to update the broker/listener quota tests 
in `ConnectionQuotasTest` and `DynamicConnectionQuotaTest` to use mock time by 
extending the `Time` interface, but I would prefer to do that in a follow-up PR.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] splett2 commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits (KIP-612, part 2)

2020-11-14 Thread GitBox


splett2 commented on a change in pull request #9386:
URL: https://github.com/apache/kafka/pull/9386#discussion_r523682718



##
File path: core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala
##
@@ -633,18 +837,27 @@ class ConnectionQuotasTest {
 listenerName: ListenerName,
 address: InetAddress,
 numConnections: Long,
-timeIntervalMs: Long) : Unit = {
+timeIntervalMs: Long,
+expectIpThrottle: Boolean): Boolean = {
 var nextSendTime = System.currentTimeMillis + timeIntervalMs
+var ipThrottled = false
 for (_ <- 0L until numConnections) {
   // this method may block if broker-wide or listener limit on the number 
of connections is reached
-  connectionQuotas.inc(listenerName, address, 
blockedPercentMeters(listenerName.value))
-
+  try {
+connectionQuotas.inc(listenerName, address, 
blockedPercentMeters(listenerName.value))
+  } catch {
+case e: ConnectionThrottledException =>
+  if (!expectIpThrottle)
+throw e
+  ipThrottled = true

Review comment:
   I took a closer look at some of the IP throttling tests, and I think 
they are a little too coarse-grained with respect to testing the throttle rate. 
I updated the IP throttling tests that don't expect to block to instead use 
`acceptConnectionsAndVerifyRate` with `MockTime`.
   
   I think it should also be possible to update the broker/listener quota tests 
in `ConnectionQuotasTest` and `DynamicConnectionQuotaTest` to use mock time by 
extending the `Time` interface, but I would prefer to do that in a follow-up PR.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] splett2 commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits (KIP-612, part 2)

2020-11-14 Thread GitBox


splett2 commented on a change in pull request #9386:
URL: https://github.com/apache/kafka/pull/9386#discussion_r521566585



##
File path: core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala
##
@@ -633,18 +837,27 @@ class ConnectionQuotasTest {
 listenerName: ListenerName,
 address: InetAddress,
 numConnections: Long,
-timeIntervalMs: Long) : Unit = {
+timeIntervalMs: Long,
+expectIpThrottle: Boolean): Boolean = {
 var nextSendTime = System.currentTimeMillis + timeIntervalMs
+var ipThrottled = false
 for (_ <- 0L until numConnections) {
   // this method may block if broker-wide or listener limit on the number 
of connections is reached
-  connectionQuotas.inc(listenerName, address, 
blockedPercentMeters(listenerName.value))
-
+  try {
+connectionQuotas.inc(listenerName, address, 
blockedPercentMeters(listenerName.value))
+  } catch {
+case e: ConnectionThrottledException =>
+  if (!expectIpThrottle)
+throw e
+  ipThrottled = true

Review comment:
   I took a closer look at some of the IP throttling tests, and I think 
they are a little too coarse-grained with respect to testing the throttle rate. 
I updated the IP throttling tests that don't expect to block to instead use 
`acceptConnectionsAndVerifyRate` with `MockTime`.
   
   I think it should also be possible to update the broker/listener quota tests 
in `ConnectionQuotasTest` and `DynamicConnectionQuotaTest` to use mock time by 
extending the `Time` interface, but I would prefer to do that in a follow-up PR.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] splett2 commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits (KIP-612, part 2)

2020-11-14 Thread GitBox


splett2 commented on a change in pull request #9386:
URL: https://github.com/apache/kafka/pull/9386#discussion_r521566585



##
File path: core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala
##
@@ -633,18 +837,27 @@ class ConnectionQuotasTest {
 listenerName: ListenerName,
 address: InetAddress,
 numConnections: Long,
-timeIntervalMs: Long) : Unit = {
+timeIntervalMs: Long,
+expectIpThrottle: Boolean): Boolean = {
 var nextSendTime = System.currentTimeMillis + timeIntervalMs
+var ipThrottled = false
 for (_ <- 0L until numConnections) {
   // this method may block if broker-wide or listener limit on the number 
of connections is reached
-  connectionQuotas.inc(listenerName, address, 
blockedPercentMeters(listenerName.value))
-
+  try {
+connectionQuotas.inc(listenerName, address, 
blockedPercentMeters(listenerName.value))
+  } catch {
+case e: ConnectionThrottledException =>
+  if (!expectIpThrottle)
+throw e
+  ipThrottled = true

Review comment:
   I took a closer look at some of the IP throttling tests, and I think 
they are a little too coarse-grained with respect to testing the throttle rate. 
I updated the IP throttling tests that don't expect to block to instead use 
`acceptConnectionsAndVerifyRate` with `MockTime`.
   
   I think it should also be possible to update the broker/listener tests in 
`ConnectionQuotasTest` to use mock time by extending the `Time` interface for 
waiting on an object monitor, but I would prefer to do that in a follow-up PR.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] splett2 commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits (KIP-612, part 2)

2020-11-14 Thread GitBox


splett2 commented on a change in pull request #9386:
URL: https://github.com/apache/kafka/pull/9386#discussion_r521856417



##
File path: core/src/main/scala/kafka/network/SocketServer.scala
##
@@ -1324,7 +1401,59 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, 
metrics: Metrics) extend
   private[network] def updateBrokerMaxConnectionRate(maxConnectionRate: Int): 
Unit = {
 // if there is a connection waiting on the rate throttle delay, we will 
let it wait the original delay even if
 // the rate limit increases, because it is just one connection per 
listener and the code is simpler that way
-updateConnectionRateQuota(maxConnectionRate)
+updateConnectionRateQuota(maxConnectionRate, BrokerQuotaEntity)
+  }
+
+  /**
+   * Update the connection rate quota for a given IP and updates quota configs 
for updated IPs.
+   * If an IP is given, metric config will be updated only for the given IP, 
otherwise
+   * all metric configs will be checked and updated if required.
+   *
+   * @param ip ip to update or default if None
+   * @param maxConnectionRate new connection rate, or resets entity to default 
if None
+   */
+  def updateIpConnectionRateQuota(ip: Option[String], maxConnectionRate: 
Option[Int]): Unit = {
+def isIpConnectionRateMetric(metricName: MetricName) = {
+  metricName.name == ConnectionRateMetricName &&
+  metricName.group == MetricsGroup &&
+  metricName.tags.containsKey(IpMetricTag)
+}
+
+def shouldUpdateQuota(metric: KafkaMetric, quotaLimit: Int) = {
+  quotaLimit != metric.config.quota.bound
+}
+counts.synchronized {

Review comment:
   After thinking on this a bit more, I think that only locking on updates 
to `defaultConnectionRatePerIp` should be sufficient for correctness.
   
   ZK dynamic config changes are processed within one thread, so we will only 
have one thread executing in `updateIpConnectionRateQuota`. If we want to be 
really careful about this, we can have `updateIpConnectionRateQuota` 
synchronized on `ConnectionQuotas`.
   
   The case we want to avoid by synchronizing on `counts` is that thread 1 
reads `defaultConnectionRatePerIp` as connection rate limit `A` while calling 
`inc()`, then thread 2 updates connection rate and quota metrics to `B`, then 
thread 1 resumes execution and creates a sensor/metric with quota limit `A` => 
inconsistency.
   
   If we synchronize on `counts` for only updates to 
`connectionRateForIp/defaultConnectionRate`, we know that thread 1 that has 
read a connection rate quota as `A` will finish creating quota metrics with 
limit `A` before thread 2 acquires the `counts` lock and updates 
`connectionRateForIp/defaultConnectionRate` to `B`. 
   
   After thread 2 releases the `counts` lock, subsequent threads calling 
`inc()` will read the quota as `B` and create a metric as `B`. Thread 2 can 
then be able to update any quota metrics from `A` to `B`, without holding the 
`counts` lock knowing that there are no operations that could have read the 
default connection rate limit as `A` without already having finished created 
the sensor with quota as `A`, and that all subsequent quotas will be read and 
created as `B`.
   
   The only issue remaining is that we can get concurrent reads of 
`connectionRatePerIp` while updating quota metrics, but we can just replace 
`mutable.Map` with `ConcurrentHashMap` which is preferable to coarsely locking 
on `counts`.
   
   Let me know if I'm missing something here with respect to thread safety.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] splett2 commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits (KIP-612, part 2)

2020-11-14 Thread GitBox


splett2 commented on a change in pull request #9386:
URL: https://github.com/apache/kafka/pull/9386#discussion_r521856417



##
File path: core/src/main/scala/kafka/network/SocketServer.scala
##
@@ -1324,7 +1401,59 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, 
metrics: Metrics) extend
   private[network] def updateBrokerMaxConnectionRate(maxConnectionRate: Int): 
Unit = {
 // if there is a connection waiting on the rate throttle delay, we will 
let it wait the original delay even if
 // the rate limit increases, because it is just one connection per 
listener and the code is simpler that way
-updateConnectionRateQuota(maxConnectionRate)
+updateConnectionRateQuota(maxConnectionRate, BrokerQuotaEntity)
+  }
+
+  /**
+   * Update the connection rate quota for a given IP and updates quota configs 
for updated IPs.
+   * If an IP is given, metric config will be updated only for the given IP, 
otherwise
+   * all metric configs will be checked and updated if required.
+   *
+   * @param ip ip to update or default if None
+   * @param maxConnectionRate new connection rate, or resets entity to default 
if None
+   */
+  def updateIpConnectionRateQuota(ip: Option[String], maxConnectionRate: 
Option[Int]): Unit = {
+def isIpConnectionRateMetric(metricName: MetricName) = {
+  metricName.name == ConnectionRateMetricName &&
+  metricName.group == MetricsGroup &&
+  metricName.tags.containsKey(IpMetricTag)
+}
+
+def shouldUpdateQuota(metric: KafkaMetric, quotaLimit: Int) = {
+  quotaLimit != metric.config.quota.bound
+}
+counts.synchronized {

Review comment:
   After thinking on this a bit more, I think that only locking on updates 
to `defaultConnectionRatePerIp` should be sufficient for correctness.
   
   ZK dynamic config changes are processed within one thread, so we will only 
have one thread executing in `updateIpConnectionRateQuota`. If we want to be 
really careful about this, we can have `updateIpConnectionRateQuota` 
synchronized on `ConnectionQuotas`.
   
   The case we want to avoid by synchronizing on `counts` is that thread 1 
reads `defaultConnectionRatePerIp` as connection rate limit `A` while calling 
`inc()`, then thread 2 updates connection rate and quota metrics to `B`, then 
thread 1 resumes execution and creates a sensor/metric with quota limit `A` => 
inconsistency.
   
   If we synchronize on `counts` for only updates to 
`connectionRateForIp/defaultConnectionRate`, we know that thread 1 that has 
read a connection rate quota as `A` will finish creating quota metrics with 
limit `A` before thread 2 acquires the `counts` lock and updates 
`connectionRateForIp/defaultConnectionRate` to `B`. 
   
   After thread 2 releases the `counts` lock, subsequent threads calling 
`inc()` will read the quota as `B` and create a metric as `B`. Thread 2 can 
then be able to update any quota metrics from `A` to `B`, without holding the 
`counts` lock knowing that there are no operations that could have read the 
default connection rate limit as `A` without already having finished created 
the sensor with quota as `A`.
   
   The only issue remaining is that we can get concurrent reads of 
`connectionRatePerIp` while updating quota metrics, but we can just replace 
`mutable.Map` with `ConcurrentHashMap` which is preferable to coarsely locking 
on `counts`.
   
   Let me know if I'm missing something here with respect to thread safety.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] splett2 commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits (KIP-612, part 2)

2020-11-14 Thread GitBox


splett2 commented on a change in pull request #9386:
URL: https://github.com/apache/kafka/pull/9386#discussion_r521856417



##
File path: core/src/main/scala/kafka/network/SocketServer.scala
##
@@ -1324,7 +1401,59 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, 
metrics: Metrics) extend
   private[network] def updateBrokerMaxConnectionRate(maxConnectionRate: Int): 
Unit = {
 // if there is a connection waiting on the rate throttle delay, we will 
let it wait the original delay even if
 // the rate limit increases, because it is just one connection per 
listener and the code is simpler that way
-updateConnectionRateQuota(maxConnectionRate)
+updateConnectionRateQuota(maxConnectionRate, BrokerQuotaEntity)
+  }
+
+  /**
+   * Update the connection rate quota for a given IP and updates quota configs 
for updated IPs.
+   * If an IP is given, metric config will be updated only for the given IP, 
otherwise
+   * all metric configs will be checked and updated if required.
+   *
+   * @param ip ip to update or default if None
+   * @param maxConnectionRate new connection rate, or resets entity to default 
if None
+   */
+  def updateIpConnectionRateQuota(ip: Option[String], maxConnectionRate: 
Option[Int]): Unit = {
+def isIpConnectionRateMetric(metricName: MetricName) = {
+  metricName.name == ConnectionRateMetricName &&
+  metricName.group == MetricsGroup &&
+  metricName.tags.containsKey(IpMetricTag)
+}
+
+def shouldUpdateQuota(metric: KafkaMetric, quotaLimit: Int) = {
+  quotaLimit != metric.config.quota.bound
+}
+counts.synchronized {

Review comment:
   After thinking on this a bit more, I think that only locking on updates 
to `defaultConnectionRatePerIp` should be sufficient for correctness.
   
   ZK dynamic config changes are processed within one thread, so we will only 
have one thread executing in `updateIpConnectionRateQuota`. If we want to be 
really careful about this, we can have `updateIpConnectionRateQuota` 
synchronized on `ConnectionQuotas`.
   
   The case we want to avoid by synchronizing on `counts` is that another 
thread reads `defaultConnectionRatePerIp` as connection rate limit `A` while 
calling `inc()`, then we update connection rate and quota metrics to `B`, then 
the other thread creates a sensor/metric with quota limit `A` => inconsistency.
   
   If we synchronize on `counts` for only updates to 
`connectionRateForIp/defaultConnectionRate`, we know that any thread that has 
read a connection rate quota as `A` will finish creating quota metrics with 
limit `A` before we acquire the `counts` lock.
   
   After acquiring the `counts` lock and updating 
`connectionRateForIp/defaultConnectionRate` to `B`, any subsequent threads 
calling `inc()` will read the quota as `B` and create a metric as `B`. We will 
then be able to update any quota metrics from `A` to `B`, knowing that there 
are no operations that could have read the connection rate limit as `A` without 
already having created the quota metric.
   
   The only issue remaining is that we can get concurrent reads of 
`connectionRatePerIp` while updating quota metrics, but we can just replace 
`mutable.Map` with `ConcurrentHashMap` which is preferable to coarsely locking 
on `counts`.
   
   Let me know if I'm missing something here with respect to thread safety.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] splett2 commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits (KIP-612, part 2)

2020-11-14 Thread GitBox


splett2 commented on a change in pull request #9386:
URL: https://github.com/apache/kafka/pull/9386#discussion_r521549126



##
File path: core/src/main/scala/kafka/network/SocketServer.scala
##
@@ -538,14 +542,20 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
   val recvBufferSize: Int,
   brokerId: Int,
   connectionQuotas: ConnectionQuotas,
-  metricPrefix: String) extends 
AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {
+  metricPrefix: String,
+  time: Time) extends 
AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {
 
   private val nioSelector = NSelector.open()
   val serverChannel = openServerSocket(endPoint.host, endPoint.port)
   private val processors = new ArrayBuffer[Processor]()
   private val processorsStarted = new AtomicBoolean
   private val blockedPercentMeter = 
newMeter(s"${metricPrefix}AcceptorBlockedPercent",
 "blocked time", TimeUnit.NANOSECONDS, Map(ListenerMetricTag -> 
endPoint.listenerName.value))
+  private var currentProcessorIndex = 0
+  private[network] case class DelayedCloseSocket(socket: SocketChannel, 
endThrottleTimeMs: Long) extends Ordered[DelayedCloseSocket] {
+override def compare(that: DelayedCloseSocket): Int = endThrottleTimeMs 
compare that.endThrottleTimeMs
+  }
+  private[network] val throttledSockets = new 
mutable.PriorityQueue[DelayedCloseSocket]()

Review comment:
   I explained elsewhere, but my comment on may have gotten lost in the 
weeds.
   
   for reference:
   
   I did not use the java DelayQueue similar to the implementation in 
ClientQuotaManager because this throttling implementation does not use 
timeout-based polling or require a synchronized data structure, and there's a 
bit more boilerplate needed for using a DelayQueue.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] splett2 commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits (KIP-612, part 2)

2020-11-11 Thread GitBox


splett2 commented on a change in pull request #9386:
URL: https://github.com/apache/kafka/pull/9386#discussion_r521856417



##
File path: core/src/main/scala/kafka/network/SocketServer.scala
##
@@ -1324,7 +1401,59 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, 
metrics: Metrics) extend
   private[network] def updateBrokerMaxConnectionRate(maxConnectionRate: Int): 
Unit = {
 // if there is a connection waiting on the rate throttle delay, we will 
let it wait the original delay even if
 // the rate limit increases, because it is just one connection per 
listener and the code is simpler that way
-updateConnectionRateQuota(maxConnectionRate)
+updateConnectionRateQuota(maxConnectionRate, BrokerQuotaEntity)
+  }
+
+  /**
+   * Update the connection rate quota for a given IP and updates quota configs 
for updated IPs.
+   * If an IP is given, metric config will be updated only for the given IP, 
otherwise
+   * all metric configs will be checked and updated if required.
+   *
+   * @param ip ip to update or default if None
+   * @param maxConnectionRate new connection rate, or resets entity to default 
if None
+   */
+  def updateIpConnectionRateQuota(ip: Option[String], maxConnectionRate: 
Option[Int]): Unit = {
+def isIpConnectionRateMetric(metricName: MetricName) = {
+  metricName.name == ConnectionRateMetricName &&
+  metricName.group == MetricsGroup &&
+  metricName.tags.containsKey(IpMetricTag)
+}
+
+def shouldUpdateQuota(metric: KafkaMetric, quotaLimit: Int) = {
+  quotaLimit != metric.config.quota.bound
+}
+counts.synchronized {

Review comment:
   After thinking on this a bit more, I think that only locking on updates 
to `defaultConnectionRatePerIp` should be sufficient for correctness.
   
   ZK dynamic config changes are processed within one thread, so we will only 
have one thread executing in `updateIpConnectionRateQuota`. If we want to be 
really careful about this, we can have `updateIpConnectionRateQuota` 
synchronized on `ConnectionQuotas`.
   
   The case we want to avoid by synchronizing on `counts` is that another 
thread `defaultConnectionRatePerIp` as connection rate limit `A` while calling 
`inc()`, then we update connection rate and quota metrics to `B`, then the 
other thread creates a sensor/metric with quota limit `A` => inconsistency.
   
   If we synchronize on `counts` for only updates to 
`connectionRateForIp/defaultConnectionRate`, we know that any thread that has 
read a connection rate quota as `A` will finish creating quota metrics with 
limit `A` before we acquire the `counts` lock.
   
   After acquiring the `counts` lock and updating 
`connectionRateForIp/defaultConnectionRate` to `B`, any subsequent threads 
calling `inc()` will read the quota as `B` and create a metric as `B`. We will 
then be able to update any quota metrics from `A` to `B`, knowing that there 
are no operations that could have read the connection rate limit as `A` without 
already having created the quota metric.
   
   The only issue remaining is that we can get concurrent reads of 
`connectionRatePerIp` while updating quota metrics, but we can just replace 
`mutable.Map` with `ConcurrentHashMap` which is preferable to coarsely locking 
on `counts`.
   
   Let me know if I'm missing something here with respect to thread safety.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] splett2 commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits (KIP-612, part 2)

2020-11-11 Thread GitBox


splett2 commented on a change in pull request #9386:
URL: https://github.com/apache/kafka/pull/9386#discussion_r521856417



##
File path: core/src/main/scala/kafka/network/SocketServer.scala
##
@@ -1324,7 +1401,59 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, 
metrics: Metrics) extend
   private[network] def updateBrokerMaxConnectionRate(maxConnectionRate: Int): 
Unit = {
 // if there is a connection waiting on the rate throttle delay, we will 
let it wait the original delay even if
 // the rate limit increases, because it is just one connection per 
listener and the code is simpler that way
-updateConnectionRateQuota(maxConnectionRate)
+updateConnectionRateQuota(maxConnectionRate, BrokerQuotaEntity)
+  }
+
+  /**
+   * Update the connection rate quota for a given IP and updates quota configs 
for updated IPs.
+   * If an IP is given, metric config will be updated only for the given IP, 
otherwise
+   * all metric configs will be checked and updated if required.
+   *
+   * @param ip ip to update or default if None
+   * @param maxConnectionRate new connection rate, or resets entity to default 
if None
+   */
+  def updateIpConnectionRateQuota(ip: Option[String], maxConnectionRate: 
Option[Int]): Unit = {
+def isIpConnectionRateMetric(metricName: MetricName) = {
+  metricName.name == ConnectionRateMetricName &&
+  metricName.group == MetricsGroup &&
+  metricName.tags.containsKey(IpMetricTag)
+}
+
+def shouldUpdateQuota(metric: KafkaMetric, quotaLimit: Int) = {
+  quotaLimit != metric.config.quota.bound
+}
+counts.synchronized {

Review comment:
   After thinking on this a bit more, I think that only locking on updates 
to `defaultConnectionRatePerIp` should be sufficient for correctness.
   
   ZK dynamic config changes are processed within one thread, so we will only 
have one thread executing in `updateIpConnectionRateQuota`. If we want to be 
really careful about this, we can have `IpConfigHandler` synchronize on 
`ConnectionQuotas`.
   
   The case we want to avoid by synchronizing on `counts` is that another 
thread `defaultConnectionRatePerIp` as connection rate limit `A` while calling 
`inc()`, then we update connection rate and quota metrics to `B`, then the 
other thread creates a sensor/metric with quota limit `A` => inconsistency.
   
   If we synchronize on `counts` for only updates to 
`connectionRateForIp/defaultConnectionRate`, we know that any thread that has 
read a connection rate quota as `A` will finish creating quota metrics with 
limit `A` before we acquire the `counts` lock.
   
   After acquiring the `counts` lock and updating 
`connectionRateForIp/defaultConnectionRate` to `B`, any subsequent threads 
calling `inc()` will read the quota as `B` and create a metric as `B`. We will 
then be able to update any quota metrics from `A` to `B`, knowing that there 
are no operations that could have read the connection rate limit as `A` without 
already having created the quota metric.
   
   The only issue remaining is that we can get concurrent reads of 
`connectionRatePerIp` while updating quota metrics, but we can just replace 
`mutable.Map` with `ConcurrentHashMap` which is preferable to coarsely locking 
on `counts`.
   
   Let me know if I'm missing something here with respect to thread safety.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] splett2 commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits (KIP-612, part 2)

2020-11-11 Thread GitBox


splett2 commented on a change in pull request #9386:
URL: https://github.com/apache/kafka/pull/9386#discussion_r521856417



##
File path: core/src/main/scala/kafka/network/SocketServer.scala
##
@@ -1324,7 +1401,59 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, 
metrics: Metrics) extend
   private[network] def updateBrokerMaxConnectionRate(maxConnectionRate: Int): 
Unit = {
 // if there is a connection waiting on the rate throttle delay, we will 
let it wait the original delay even if
 // the rate limit increases, because it is just one connection per 
listener and the code is simpler that way
-updateConnectionRateQuota(maxConnectionRate)
+updateConnectionRateQuota(maxConnectionRate, BrokerQuotaEntity)
+  }
+
+  /**
+   * Update the connection rate quota for a given IP and updates quota configs 
for updated IPs.
+   * If an IP is given, metric config will be updated only for the given IP, 
otherwise
+   * all metric configs will be checked and updated if required.
+   *
+   * @param ip ip to update or default if None
+   * @param maxConnectionRate new connection rate, or resets entity to default 
if None
+   */
+  def updateIpConnectionRateQuota(ip: Option[String], maxConnectionRate: 
Option[Int]): Unit = {
+def isIpConnectionRateMetric(metricName: MetricName) = {
+  metricName.name == ConnectionRateMetricName &&
+  metricName.group == MetricsGroup &&
+  metricName.tags.containsKey(IpMetricTag)
+}
+
+def shouldUpdateQuota(metric: KafkaMetric, quotaLimit: Int) = {
+  quotaLimit != metric.config.quota.bound
+}
+counts.synchronized {

Review comment:
   After thinking on this a bit more, I think that only locking on updates 
to `defaultConnectionRatePerIp` should be sufficient for correctness.
   
   ZK dynamic config changes are processed within one thread, so we will only 
have one thread executing in `updateIpConnectionRateQuota`. If we want to be 
really careful about this, we can have `IpConfigHandler` synchronize on 
`ConnectionQuotas`.
   
   The case we want to avoid by synchronizing on `counts` is that another 
thread `defaultConnectionRatePerIp` as connection rate limit `A` while calling 
`inc()`, then we update connection rate and quota metrics to `B`, then the 
other thread creates a sensor/metric with quota limit `A` => inconsistency.
   
   If we synchronize on `counts` for only updates to 
`connectionRateForIp/defaultConnectionRate`, we know that any thread that has 
read a connection rate quota as `A` will finish creating quota metrics with 
limit `A` before we acquire the `counts` lock.
   
   After acquiring the `counts` lock and updating 
`connectionRateForIp/defaultConnectionRate` to `B`, any subsequent threads 
calling `inc()` will read the quota as `B` and create a metric as `B`. We will 
then be able to update any quota metrics from `A` to `B`, knowing that there 
are no operations that could have read the connection rate limit as `A` without 
already having created the quota metric.
   
   The only issue remaining is that we can get concurrent reads of 
`connectionRatePerIp`, but we can just replace `mutable.Map` with 
`ConcurrentHashMap` which is preferable to coarsely locking on `counts`.
   
   Let me know if I'm missing something here with respect to thread safety.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] splett2 commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits (KIP-612, part 2)

2020-11-11 Thread GitBox


splett2 commented on a change in pull request #9386:
URL: https://github.com/apache/kafka/pull/9386#discussion_r521856417



##
File path: core/src/main/scala/kafka/network/SocketServer.scala
##
@@ -1324,7 +1401,59 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, 
metrics: Metrics) extend
   private[network] def updateBrokerMaxConnectionRate(maxConnectionRate: Int): 
Unit = {
 // if there is a connection waiting on the rate throttle delay, we will 
let it wait the original delay even if
 // the rate limit increases, because it is just one connection per 
listener and the code is simpler that way
-updateConnectionRateQuota(maxConnectionRate)
+updateConnectionRateQuota(maxConnectionRate, BrokerQuotaEntity)
+  }
+
+  /**
+   * Update the connection rate quota for a given IP and updates quota configs 
for updated IPs.
+   * If an IP is given, metric config will be updated only for the given IP, 
otherwise
+   * all metric configs will be checked and updated if required.
+   *
+   * @param ip ip to update or default if None
+   * @param maxConnectionRate new connection rate, or resets entity to default 
if None
+   */
+  def updateIpConnectionRateQuota(ip: Option[String], maxConnectionRate: 
Option[Int]): Unit = {
+def isIpConnectionRateMetric(metricName: MetricName) = {
+  metricName.name == ConnectionRateMetricName &&
+  metricName.group == MetricsGroup &&
+  metricName.tags.containsKey(IpMetricTag)
+}
+
+def shouldUpdateQuota(metric: KafkaMetric, quotaLimit: Int) = {
+  quotaLimit != metric.config.quota.bound
+}
+counts.synchronized {

Review comment:
   After thinking on this a bit more, I think that only locking on updates 
to `defaultConnectionRatePerIp` should be sufficient for correctness.
   
   ZK dynamic config changes are processed within one thread, so we will only 
have one thread executing in `updateIpConnectionRateQuota`. If we want to be 
really careful about this, we can have `IpConfigHandler` synchronize on 
`ConnectionQuotas`.
   
   The case we want to avoid by synchronizing on `counts` is that another 
thread `defaultConnectionRatePerIp` as connection rate limit `A` while calling 
`inc()`, then we update connection rate and quota metrics to `B`, then the 
other thread creates a sensor/metric with quota limit `A` => inconsistency.
   
   If we synchronize on `counts` for only updates to 
`connectionRateForIp/defaultConnectionRate`, we know that any thread that has 
read a connection rate quota as `A` will finish creating quota metrics with 
limit `A` before we acquire the `counts` lock.
   
   After acquiring the `counts` lock and updating 
`connectionRateForIp/defaultConnectionRate` to `B`, any subsequent threads 
calling `inc()` will read the quota as `B` and create a metric as `B`. We will 
then be able to update any quota metrics from `A` to `B`, knowing that there 
are no operations that could have read the connection rate limit as `A` without 
already having created the quota metric.
   
   The only issue remaining is that we can get concurrent reads of 
`connectionRatePerIp`, but we can just replace `mutable.Map` with 
`ConcurrentHashMap`.
   
   Let me know if I'm missing something here with respect to thread safety.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] splett2 commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits (KIP-612, part 2)

2020-11-11 Thread GitBox


splett2 commented on a change in pull request #9386:
URL: https://github.com/apache/kafka/pull/9386#discussion_r521856417



##
File path: core/src/main/scala/kafka/network/SocketServer.scala
##
@@ -1324,7 +1401,59 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, 
metrics: Metrics) extend
   private[network] def updateBrokerMaxConnectionRate(maxConnectionRate: Int): 
Unit = {
 // if there is a connection waiting on the rate throttle delay, we will 
let it wait the original delay even if
 // the rate limit increases, because it is just one connection per 
listener and the code is simpler that way
-updateConnectionRateQuota(maxConnectionRate)
+updateConnectionRateQuota(maxConnectionRate, BrokerQuotaEntity)
+  }
+
+  /**
+   * Update the connection rate quota for a given IP and updates quota configs 
for updated IPs.
+   * If an IP is given, metric config will be updated only for the given IP, 
otherwise
+   * all metric configs will be checked and updated if required.
+   *
+   * @param ip ip to update or default if None
+   * @param maxConnectionRate new connection rate, or resets entity to default 
if None
+   */
+  def updateIpConnectionRateQuota(ip: Option[String], maxConnectionRate: 
Option[Int]): Unit = {
+def isIpConnectionRateMetric(metricName: MetricName) = {
+  metricName.name == ConnectionRateMetricName &&
+  metricName.group == MetricsGroup &&
+  metricName.tags.containsKey(IpMetricTag)
+}
+
+def shouldUpdateQuota(metric: KafkaMetric, quotaLimit: Int) = {
+  quotaLimit != metric.config.quota.bound
+}
+counts.synchronized {

Review comment:
   I'm thinking on this a bit more, and I think that only locking on 
updates to `defaultConnectionRatePerIp` should be sufficient for correctness.
   
   ZK dynamic config changes are processed within one thread, so we will only 
have one thread executing in `updateIpConnectionRateQuota`. If we want to be 
really careful about this, we can have `IpConfigHandler` synchronize on 
`ConnectionQuotas`.
   
   The case we want to avoid by synchronizing on `counts` is that another 
thread `defaultConnectionRatePerIp` as connection rate limit `A` while calling 
`inc()`, then we update connection rate and quota metrics to `B`, then the 
other thread creates a sensor/metric with quota limit `A` => inconsistency.
   
   If we synchronize on `counts` for only updates to 
`connectionRateForIp/defaultConnectionRate`, we know that any thread that has 
read a connection rate quota as `A` will finish creating quota metrics with 
limit `A` before we acquire the `counts` lock.
   
   After acquiring the `counts` lock and updating 
`connectionRateForIp/defaultConnectionRate` to `B`, any subsequent threads 
calling `inc()` will read the quota as `B` and create a metric as `B`. We will 
then be able to update any quota metrics from `A` to `B`, knowing that there 
are no operations that could have read the connection rate limit as `A` without 
already having created the quota metric.
   
   The only issue remaining is that we can get concurrent reads of 
`connectionRatePerIp`, but we can just replace `mutable.Map` with 
`ConcurrentHashMap`.
   
   Let me know if I'm missing something here with respect to thread safety.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] splett2 commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits (KIP-612, part 2)

2020-11-11 Thread GitBox


splett2 commented on a change in pull request #9386:
URL: https://github.com/apache/kafka/pull/9386#discussion_r521856417



##
File path: core/src/main/scala/kafka/network/SocketServer.scala
##
@@ -1324,7 +1401,59 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, 
metrics: Metrics) extend
   private[network] def updateBrokerMaxConnectionRate(maxConnectionRate: Int): 
Unit = {
 // if there is a connection waiting on the rate throttle delay, we will 
let it wait the original delay even if
 // the rate limit increases, because it is just one connection per 
listener and the code is simpler that way
-updateConnectionRateQuota(maxConnectionRate)
+updateConnectionRateQuota(maxConnectionRate, BrokerQuotaEntity)
+  }
+
+  /**
+   * Update the connection rate quota for a given IP and updates quota configs 
for updated IPs.
+   * If an IP is given, metric config will be updated only for the given IP, 
otherwise
+   * all metric configs will be checked and updated if required.
+   *
+   * @param ip ip to update or default if None
+   * @param maxConnectionRate new connection rate, or resets entity to default 
if None
+   */
+  def updateIpConnectionRateQuota(ip: Option[String], maxConnectionRate: 
Option[Int]): Unit = {
+def isIpConnectionRateMetric(metricName: MetricName) = {
+  metricName.name == ConnectionRateMetricName &&
+  metricName.group == MetricsGroup &&
+  metricName.tags.containsKey(IpMetricTag)
+}
+
+def shouldUpdateQuota(metric: KafkaMetric, quotaLimit: Int) = {
+  quotaLimit != metric.config.quota.bound
+}
+counts.synchronized {

Review comment:
   I'm thinking on this a bit more, and I think that only locking on 
updates to `defaultConnectionRatePerIp` should be sufficient for correctness.
   
   ZK dynamic config changes are processed within one thread, so we will only 
have one thread executing in `updateIpConnectionRateQuota`. If we want to be 
really careful about this, we can have `IpConfigHandler` synchronize on 
`ConnectionQuotas`.
   
   The case we want to avoid by synchronizing on `counts` is that another 
thread `defaultConnectionRatePerIp` as connection rate limit `A` while calling 
`inc()`, then we update connection rate and quota metrics to `B`, then the 
other thread creates a sensor/metric with quota limit `A` => inconsistency.
   
   If we synchronize on `counts` for only updates to 
`connectionRateForIp/defaultConnectionRate`, we know that any thread that has 
read a connection rate quota as `A` will finish creating quota metrics with 
limit `A` before we acquire the `counts` lock.
   
   After acquiring the `counts` lock and updating 
`connectionRateForIp/defaultConnectionRate` to `B`, any subsequent threads 
calling `inc()` will read the quota as `B` and create a metric as `B`. We will 
then be able to update any quota metrics from `A` to `B`, knowing that all new 
ones will be created as `B`.
   
   The only issue remaining is that we can get concurrent reads of 
`connectionRatePerIp`, but we can just replace `mutable.Map` with 
`ConcurrentHashMap`.
   
   Let me know if I'm missing something here with respect to thread safety.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] splett2 commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits (KIP-612, part 2)

2020-11-11 Thread GitBox


splett2 commented on a change in pull request #9386:
URL: https://github.com/apache/kafka/pull/9386#discussion_r521856417



##
File path: core/src/main/scala/kafka/network/SocketServer.scala
##
@@ -1324,7 +1401,59 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, 
metrics: Metrics) extend
   private[network] def updateBrokerMaxConnectionRate(maxConnectionRate: Int): 
Unit = {
 // if there is a connection waiting on the rate throttle delay, we will 
let it wait the original delay even if
 // the rate limit increases, because it is just one connection per 
listener and the code is simpler that way
-updateConnectionRateQuota(maxConnectionRate)
+updateConnectionRateQuota(maxConnectionRate, BrokerQuotaEntity)
+  }
+
+  /**
+   * Update the connection rate quota for a given IP and updates quota configs 
for updated IPs.
+   * If an IP is given, metric config will be updated only for the given IP, 
otherwise
+   * all metric configs will be checked and updated if required.
+   *
+   * @param ip ip to update or default if None
+   * @param maxConnectionRate new connection rate, or resets entity to default 
if None
+   */
+  def updateIpConnectionRateQuota(ip: Option[String], maxConnectionRate: 
Option[Int]): Unit = {
+def isIpConnectionRateMetric(metricName: MetricName) = {
+  metricName.name == ConnectionRateMetricName &&
+  metricName.group == MetricsGroup &&
+  metricName.tags.containsKey(IpMetricTag)
+}
+
+def shouldUpdateQuota(metric: KafkaMetric, quotaLimit: Int) = {
+  quotaLimit != metric.config.quota.bound
+}
+counts.synchronized {

Review comment:
   I'm thinking on this a bit more, and I think that only locking on 
updates to `defaultConnectionRatePerIp` should be sufficient for correctness.
   
   ZK dynamic config changes are processed within one thread, so we will only 
have one thread executing in `updateIpConnectionRateQuota`. If we want to be 
really careful about this, we can have `IpConfigHandler` synchronize on 
`ConnectionQuotas`.
   
   The case we want to avoid by synchronizing on `counts` is that another 
thread `defaultConnectionRatePerIp` as connection rate limit `A` while calling 
`inc()`, then we update connection rate and quota metrics to `B`, then the 
other thread creates a sensor/metric with quota limit `A` => inconsistency.
   
   If we synchronize on `counts` for only updates to 
`connectionRateForIp/defaultConnectionRate`, we know that any thread that has 
read a connection rate quota as `A` will finish creating quota metrics with 
limit `A` before we acquire the `counts` lock.
   
   After acquiring the `counts` lock and updating 
`connectionRateForIp/defaultConnectionRate` to `B`, any subsequent threads 
calling `inc()` will read the quota as `B` and create a metric as `B`. We will 
then be able to update any quota metrics from `A` to `B`, knowing that all new 
ones will be created as `B`.
   
   The only issue remaining is that we can get concurrent reads of 
`connectionRatePerIp`, but we can just replace `mutable.Map` with 
`ConcurrentHashMap`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] splett2 commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits (KIP-612, part 2)

2020-11-11 Thread GitBox


splett2 commented on a change in pull request #9386:
URL: https://github.com/apache/kafka/pull/9386#discussion_r521549126



##
File path: core/src/main/scala/kafka/network/SocketServer.scala
##
@@ -538,14 +542,20 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
   val recvBufferSize: Int,
   brokerId: Int,
   connectionQuotas: ConnectionQuotas,
-  metricPrefix: String) extends 
AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {
+  metricPrefix: String,
+  time: Time) extends 
AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {
 
   private val nioSelector = NSelector.open()
   val serverChannel = openServerSocket(endPoint.host, endPoint.port)
   private val processors = new ArrayBuffer[Processor]()
   private val processorsStarted = new AtomicBoolean
   private val blockedPercentMeter = 
newMeter(s"${metricPrefix}AcceptorBlockedPercent",
 "blocked time", TimeUnit.NANOSECONDS, Map(ListenerMetricTag -> 
endPoint.listenerName.value))
+  private var currentProcessorIndex = 0
+  private[network] case class DelayedCloseSocket(socket: SocketChannel, 
endThrottleTimeMs: Long) extends Ordered[DelayedCloseSocket] {
+override def compare(that: DelayedCloseSocket): Int = endThrottleTimeMs 
compare that.endThrottleTimeMs
+  }
+  private[network] val throttledSockets = new 
mutable.PriorityQueue[DelayedCloseSocket]()

Review comment:
   I explained elsewhere, but my comment on may have gotten lost in the 
weeds.
   
   for reference:
   
   I did not use the java DelayQueue similar to the implementation in 
ClientQuotaManager because this throttling implementation does not use 
timeout-based polling or require a synchronized data structure, and there's a 
bit more boilerplate needed for a DelayQueue to support timeout-based polling.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] splett2 commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits (KIP-612, part 2)

2020-11-11 Thread GitBox


splett2 commented on a change in pull request #9386:
URL: https://github.com/apache/kafka/pull/9386#discussion_r521545712



##
File path: core/src/main/scala/kafka/network/SocketServer.scala
##
@@ -1324,7 +1401,59 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, 
metrics: Metrics) extend
   private[network] def updateBrokerMaxConnectionRate(maxConnectionRate: Int): 
Unit = {
 // if there is a connection waiting on the rate throttle delay, we will 
let it wait the original delay even if
 // the rate limit increases, because it is just one connection per 
listener and the code is simpler that way
-updateConnectionRateQuota(maxConnectionRate)
+updateConnectionRateQuota(maxConnectionRate, BrokerQuotaEntity)
+  }
+
+  /**
+   * Update the connection rate quota for a given IP and updates quota configs 
for updated IPs.
+   * If an IP is given, metric config will be updated only for the given IP, 
otherwise
+   * all metric configs will be checked and updated if required.
+   *
+   * @param ip ip to update or default if None
+   * @param maxConnectionRate new connection rate, or resets entity to default 
if None
+   */
+  def updateIpConnectionRateQuota(ip: Option[String], maxConnectionRate: 
Option[Int]): Unit = {
+def isIpConnectionRateMetric(metricName: MetricName) = {
+  metricName.name == ConnectionRateMetricName &&
+  metricName.group == MetricsGroup &&
+  metricName.tags.containsKey(IpMetricTag)
+}
+
+def shouldUpdateQuota(metric: KafkaMetric, quotaLimit: Int) = {
+  quotaLimit != metric.config.quota.bound
+}
+counts.synchronized {

Review comment:
   yeah, it's undesirable behavior. However, this is in line with 
`ClientQuotaManager.updateQuota`, which write locks when updating quota, and 
also has to iterate over all metrics for any default quota updates.
   
   I can't think of a good alternative, but I'm open to suggestions.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] splett2 commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits (KIP-612, part 2)

2020-11-11 Thread GitBox


splett2 commented on a change in pull request #9386:
URL: https://github.com/apache/kafka/pull/9386#discussion_r521614527



##
File path: core/src/main/scala/kafka/network/SocketServer.scala
##
@@ -1324,7 +1401,59 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, 
metrics: Metrics) extend
   private[network] def updateBrokerMaxConnectionRate(maxConnectionRate: Int): 
Unit = {
 // if there is a connection waiting on the rate throttle delay, we will 
let it wait the original delay even if
 // the rate limit increases, because it is just one connection per 
listener and the code is simpler that way
-updateConnectionRateQuota(maxConnectionRate)
+updateConnectionRateQuota(maxConnectionRate, BrokerQuotaEntity)
+  }
+
+  /**
+   * Update the connection rate quota for a given IP and updates quota configs 
for updated IPs.
+   * If an IP is given, metric config will be updated only for the given IP, 
otherwise
+   * all metric configs will be checked and updated if required.
+   *
+   * @param ip ip to update or default if None
+   * @param maxConnectionRate new connection rate, or resets entity to default 
if None
+   */
+  def updateIpConnectionRateQuota(ip: Option[String], maxConnectionRate: 
Option[Int]): Unit = {
+def isIpConnectionRateMetric(metricName: MetricName) = {
+  metricName.name == ConnectionRateMetricName &&
+  metricName.group == MetricsGroup &&
+  metricName.tags.containsKey(IpMetricTag)
+}
+
+def shouldUpdateQuota(metric: KafkaMetric, quotaLimit: Int) = {
+  quotaLimit != metric.config.quota.bound
+}
+counts.synchronized {
+  ip match {
+case Some(addr) =>
+  val address = InetAddress.getByName(addr)

Review comment:
   Moved IP resolution to `IpConfigHandler`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] splett2 commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits (KIP-612, part 2)

2020-11-11 Thread GitBox


splett2 commented on a change in pull request #9386:
URL: https://github.com/apache/kafka/pull/9386#discussion_r521549126



##
File path: core/src/main/scala/kafka/network/SocketServer.scala
##
@@ -538,14 +542,20 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
   val recvBufferSize: Int,
   brokerId: Int,
   connectionQuotas: ConnectionQuotas,
-  metricPrefix: String) extends 
AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {
+  metricPrefix: String,
+  time: Time) extends 
AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {
 
   private val nioSelector = NSelector.open()
   val serverChannel = openServerSocket(endPoint.host, endPoint.port)
   private val processors = new ArrayBuffer[Processor]()
   private val processorsStarted = new AtomicBoolean
   private val blockedPercentMeter = 
newMeter(s"${metricPrefix}AcceptorBlockedPercent",
 "blocked time", TimeUnit.NANOSECONDS, Map(ListenerMetricTag -> 
endPoint.listenerName.value))
+  private var currentProcessorIndex = 0
+  private[network] case class DelayedCloseSocket(socket: SocketChannel, 
endThrottleTimeMs: Long) extends Ordered[DelayedCloseSocket] {
+override def compare(that: DelayedCloseSocket): Int = endThrottleTimeMs 
compare that.endThrottleTimeMs
+  }
+  private[network] val throttledSockets = new 
mutable.PriorityQueue[DelayedCloseSocket]()

Review comment:
   my comment on this may have gotten lost in the weeds.
   
   for reference:
   
   I did not use the java DelayQueue similar to the implementation in 
ClientQuotaManager because this throttling implementation does not use 
timeout-based polling or require a synchronized data structure, and there's a 
bit more boilerplate needed for a DelayQueue to support timeout-based polling.
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] splett2 commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits (KIP-612, part 2)

2020-11-11 Thread GitBox


splett2 commented on a change in pull request #9386:
URL: https://github.com/apache/kafka/pull/9386#discussion_r521566585



##
File path: core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala
##
@@ -633,18 +837,27 @@ class ConnectionQuotasTest {
 listenerName: ListenerName,
 address: InetAddress,
 numConnections: Long,
-timeIntervalMs: Long) : Unit = {
+timeIntervalMs: Long,
+expectIpThrottle: Boolean): Boolean = {
 var nextSendTime = System.currentTimeMillis + timeIntervalMs
+var ipThrottled = false
 for (_ <- 0L until numConnections) {
   // this method may block if broker-wide or listener limit on the number 
of connections is reached
-  connectionQuotas.inc(listenerName, address, 
blockedPercentMeters(listenerName.value))
-
+  try {
+connectionQuotas.inc(listenerName, address, 
blockedPercentMeters(listenerName.value))
+  } catch {
+case e: ConnectionThrottledException =>
+  if (!expectIpThrottle)
+throw e
+  ipThrottled = true

Review comment:
   a lot of the tests in `ConnectionQuotasTest` are using system time and 
are looking to reach a connection rate of N/s. so it may be the case that 
`ipThrottled` will remain true, but we want to verify that we can hit a target 
connection rate, even if connections in the middle of the interval get 
throttled.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] splett2 commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits (KIP-612, part 2)

2020-11-11 Thread GitBox


splett2 commented on a change in pull request #9386:
URL: https://github.com/apache/kafka/pull/9386#discussion_r521564771



##
File path: core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala
##
@@ -477,6 +568,99 @@ class ConnectionQuotasTest {
 assertTrue("Expected BlockedPercentMeter metric for EXTERNAL listener to 
be recorded", blockedPercentMeters("EXTERNAL").count() > 0)
   }
 
+  @Test
+  def testIpConnectionRateUpdate(): Unit = {
+val config = KafkaConfig.fromProps(brokerPropsWithDefaultConnectionLimits)
+connectionQuotas = new ConnectionQuotas(config, Time.SYSTEM, metrics)
+connectionQuotas.addListener(config, listeners("EXTERNAL").listenerName)
+connectionQuotas.addListener(config, listeners("ADMIN").listenerName)
+connectionQuotas.addListener(config, listeners("REPLICATION").listenerName)
+val defaultIpRate = 50
+val defaultOverrideRate = 20
+val overrideIpRate = 30
+val externalListener = listeners("EXTERNAL")
+val adminListener = listeners("ADMIN")
+// set a non-unlimited default quota so that we create ip rate 
sensors/metrics
+connectionQuotas.updateIpConnectionRateQuota(None, Some(defaultIpRate))
+connectionQuotas.inc(externalListener.listenerName, 
externalListener.defaultIp, blockedPercentMeters("EXTERNAL"))
+connectionQuotas.inc(adminListener.listenerName, adminListener.defaultIp, 
blockedPercentMeters("ADMIN"))
+
+// both IPs should have the default rate
+verifyIpConnectionQuota(externalListener.defaultIp, defaultIpRate)
+verifyIpConnectionQuota(adminListener.defaultIp, defaultIpRate)
+
+// external listener should have its in-memory quota and metric config 
updated
+
connectionQuotas.updateIpConnectionRateQuota(Some(externalListener.defaultIp.getHostAddress),
 Some(overrideIpRate))
+verifyIpConnectionQuota(externalListener.defaultIp, overrideIpRate)
+
+// update default
+connectionQuotas.updateIpConnectionRateQuota(None, 
Some(defaultOverrideRate))
+
+// external listener IP should not have its quota updated to the new 
default
+verifyIpConnectionQuota(externalListener.defaultIp, overrideIpRate)
+// admin listener IP should have its quota updated with to the new default
+verifyIpConnectionQuota(adminListener.defaultIp, defaultOverrideRate)
+
+// remove default connection rate quota
+connectionQuotas.updateIpConnectionRateQuota(None, None)
+verifyIpConnectionQuota(adminListener.defaultIp, 
DynamicConfig.Ip.DefaultConnectionCreationRate)
+
+// remove override for external listener IP
+
connectionQuotas.updateIpConnectionRateQuota(Some(externalListener.defaultIp.getHostAddress),
 None)
+verifyIpConnectionQuota(externalListener.defaultIp, 
DynamicConfig.Ip.DefaultConnectionCreationRate)
+  }
+
+  @Test
+  def testIpConnectionRateQuotaUpdate(): Unit = {

Review comment:
   I renamed to `testIpConnectionRateMetricUpdate` and 
`testEnforcedIpConnectionRateQuotaUpdate` to clarify.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] splett2 commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits (KIP-612, part 2)

2020-11-11 Thread GitBox


splett2 commented on a change in pull request #9386:
URL: https://github.com/apache/kafka/pull/9386#discussion_r521549126



##
File path: core/src/main/scala/kafka/network/SocketServer.scala
##
@@ -538,14 +542,20 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
   val recvBufferSize: Int,
   brokerId: Int,
   connectionQuotas: ConnectionQuotas,
-  metricPrefix: String) extends 
AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {
+  metricPrefix: String,
+  time: Time) extends 
AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {
 
   private val nioSelector = NSelector.open()
   val serverChannel = openServerSocket(endPoint.host, endPoint.port)
   private val processors = new ArrayBuffer[Processor]()
   private val processorsStarted = new AtomicBoolean
   private val blockedPercentMeter = 
newMeter(s"${metricPrefix}AcceptorBlockedPercent",
 "blocked time", TimeUnit.NANOSECONDS, Map(ListenerMetricTag -> 
endPoint.listenerName.value))
+  private var currentProcessorIndex = 0
+  private[network] case class DelayedCloseSocket(socket: SocketChannel, 
endThrottleTimeMs: Long) extends Ordered[DelayedCloseSocket] {
+override def compare(that: DelayedCloseSocket): Int = endThrottleTimeMs 
compare that.endThrottleTimeMs
+  }
+  private[network] val throttledSockets = new 
mutable.PriorityQueue[DelayedCloseSocket]()

Review comment:
   my comment on this may have gotten lost in the weeds, for reference:
   
   I did not use the java DelayQueue similar to the implementation in 
ClientQuotaManager because this throttling implementation does not use 
timeout-based polling or require a synchronized data structure, and there's a 
bit more boilerplate needed for a DelayQueue to support timeout-based polling.
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] splett2 commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits (KIP-612, part 2)

2020-11-11 Thread GitBox


splett2 commented on a change in pull request #9386:
URL: https://github.com/apache/kafka/pull/9386#discussion_r521545712



##
File path: core/src/main/scala/kafka/network/SocketServer.scala
##
@@ -1324,7 +1401,59 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, 
metrics: Metrics) extend
   private[network] def updateBrokerMaxConnectionRate(maxConnectionRate: Int): 
Unit = {
 // if there is a connection waiting on the rate throttle delay, we will 
let it wait the original delay even if
 // the rate limit increases, because it is just one connection per 
listener and the code is simpler that way
-updateConnectionRateQuota(maxConnectionRate)
+updateConnectionRateQuota(maxConnectionRate, BrokerQuotaEntity)
+  }
+
+  /**
+   * Update the connection rate quota for a given IP and updates quota configs 
for updated IPs.
+   * If an IP is given, metric config will be updated only for the given IP, 
otherwise
+   * all metric configs will be checked and updated if required.
+   *
+   * @param ip ip to update or default if None
+   * @param maxConnectionRate new connection rate, or resets entity to default 
if None
+   */
+  def updateIpConnectionRateQuota(ip: Option[String], maxConnectionRate: 
Option[Int]): Unit = {
+def isIpConnectionRateMetric(metricName: MetricName) = {
+  metricName.name == ConnectionRateMetricName &&
+  metricName.group == MetricsGroup &&
+  metricName.tags.containsKey(IpMetricTag)
+}
+
+def shouldUpdateQuota(metric: KafkaMetric, quotaLimit: Int) = {
+  quotaLimit != metric.config.quota.bound
+}
+counts.synchronized {

Review comment:
   yeah, it's not desirable behavior. However, this is more or less in line 
with
   `ClientQuotaManager.updateQuota`, which write locks when updating quota, and 
also has to iterate over all metrics for any default quota updates.
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] splett2 commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits (KIP-612, part 2)

2020-11-06 Thread GitBox


splett2 commented on a change in pull request #9386:
URL: https://github.com/apache/kafka/pull/9386#discussion_r519054824



##
File path: core/src/main/scala/kafka/network/SocketServer.scala
##
@@ -1659,19 +1648,22 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, 
metrics: Metrics) extend
* @param connectionQuotaEntity entity to create the sensor for
*/
   private def getOrCreateConnectionRateQuotaSensor(quotaLimit: Int, 
connectionQuotaEntity: ConnectionQuotaEntity): Sensor = {
-sensorAccessor.getOrCreate(
-  connectionQuotaEntity.sensorName,
-  connectionQuotaEntity.sensorExpiration,
-  sensor => sensor.add(connectionRateMetricName(connectionQuotaEntity), 
new Rate, rateQuotaMetricConfig(quotaLimit))
-)
+Option(metrics.getSensor(connectionQuotaEntity.sensorName)).getOrElse {
+  val sensor = metrics.sensor(
+connectionQuotaEntity.sensorName,
+rateQuotaMetricConfig(quotaLimit),
+connectionQuotaEntity.sensorExpiration
+  )
+  sensor.add(connectionRateMetricName(connectionQuotaEntity), new Rate, 
null)
+  sensor
+}

Review comment:
   I removed use of `sensorAccess`, since all calls to 
`sensorAccessor.getOrCreate` were performed with the `counts` lock, so the 
read-write lock is redundant.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] splett2 commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits (KIP-612, part 2)

2020-11-06 Thread GitBox


splett2 commented on a change in pull request #9386:
URL: https://github.com/apache/kafka/pull/9386#discussion_r519054824



##
File path: core/src/main/scala/kafka/network/SocketServer.scala
##
@@ -1659,19 +1648,22 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, 
metrics: Metrics) extend
* @param connectionQuotaEntity entity to create the sensor for
*/
   private def getOrCreateConnectionRateQuotaSensor(quotaLimit: Int, 
connectionQuotaEntity: ConnectionQuotaEntity): Sensor = {
-sensorAccessor.getOrCreate(
-  connectionQuotaEntity.sensorName,
-  connectionQuotaEntity.sensorExpiration,
-  sensor => sensor.add(connectionRateMetricName(connectionQuotaEntity), 
new Rate, rateQuotaMetricConfig(quotaLimit))
-)
+Option(metrics.getSensor(connectionQuotaEntity.sensorName)).getOrElse {
+  val sensor = metrics.sensor(
+connectionQuotaEntity.sensorName,
+rateQuotaMetricConfig(quotaLimit),
+connectionQuotaEntity.sensorExpiration
+  )
+  sensor.add(connectionRateMetricName(connectionQuotaEntity), new Rate, 
null)
+  sensor
+}

Review comment:
   I removed use of `sensorAccess`, since all calls to 
`sensorAccessor.getOrCreate` were performed with the `counts` lock.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] splett2 commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits (KIP-612, part 2)

2020-11-03 Thread GitBox


splett2 commented on a change in pull request #9386:
URL: https://github.com/apache/kafka/pull/9386#discussion_r516997443



##
File path: core/src/main/scala/kafka/network/SocketServer.scala
##
@@ -697,6 +714,31 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
 info(s"Rejected connection from ${e.ip}, address already has the 
configured maximum of ${e.count} connections.")
 close(endPoint.listenerName, socketChannel)
 None
+  case e: ConnectionThrottledException =>
+val ip = socketChannel.socket.getInetAddress
+debug(s"Delaying closing of connection from $ip for 
${e.throttleTimeMs} ms")
+val delayQueue = throttledSockets.computeIfAbsent(ip, _ => new 
mutable.Queue[DelayedCloseSocket])

Review comment:
   Okay, I replaced the map/queue with a priority queue.
   I did not use the java `DelayQueue` similar to the implementation in 
`ClientQuotaManager` because this throttling implementation does not use 
timeout-based polling or require a synchronized data structure, and there's 
significantly more boilerplate needed for a `DelayQueue`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] splett2 commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits (KIP-612, part 2)

2020-11-03 Thread GitBox


splett2 commented on a change in pull request #9386:
URL: https://github.com/apache/kafka/pull/9386#discussion_r516995083



##
File path: core/src/main/scala/kafka/network/SocketServer.scala
##
@@ -1207,14 +1286,26 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, 
metrics: Metrics) extend
   private val listenerCounts = mutable.Map[ListenerName, Int]()
   private[network] val maxConnectionsPerListener = mutable.Map[ListenerName, 
ListenerConnectionQuota]()
   @volatile private var totalCount = 0
-
+  @volatile private var defaultConnectionRatePerIp = 
DynamicConfig.Ip.DefaultConnectionCreationRate
+  private val connectionRatePerIp = new ConcurrentHashMap[InetAddress, Int]()
+  private val lock = new ReentrantReadWriteLock()
+  private val sensorAccessor = new SensorAccess(lock, metrics)
   // sensor that tracks broker-wide connection creation rate and limit (quota)
-  private val brokerConnectionRateSensor = 
createConnectionRateQuotaSensor(config.maxConnectionCreationRate)
+  private val brokerConnectionRateSensor = 
getOrCreateConnectionRateQuotaSensor(config.maxConnectionCreationRate, 
BrokerQuotaEntity)
   private val maxThrottleTimeMs = 
TimeUnit.SECONDS.toMillis(config.quotaWindowSizeSeconds.toLong)
 
   def inc(listenerName: ListenerName, address: InetAddress, 
acceptorBlockedPercentMeter: com.yammer.metrics.core.Meter): Unit = {
 counts.synchronized {
-  waitForConnectionSlot(listenerName, acceptorBlockedPercentMeter)
+  val startThrottleTimeMs = time.milliseconds
+
+  waitForConnectionSlot(listenerName, startThrottleTimeMs, 
acceptorBlockedPercentMeter)
+
+  val ipThrottleTimeMs = recordIpConnectionMaybeThrottle(address, 
startThrottleTimeMs)

Review comment:
   seems reasonable





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] splett2 commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits (KIP-612, part 2)

2020-11-03 Thread GitBox


splett2 commented on a change in pull request #9386:
URL: https://github.com/apache/kafka/pull/9386#discussion_r516301423



##
File path: 
core/src/test/scala/integration/kafka/network/DynamicConnectionQuotaTest.scala
##
@@ -240,6 +256,16 @@ class DynamicConnectionQuotaTest extends BaseRequestTest {
   s"Admin client connection not closed (initial = $initialConnectionCount, 
current = $connectionCount)")
   }
 
+  private def updateIpConnectionRate(ip: Option[String], updatedRate: Int): 
Unit = {
+adminZkClient.changeIpConfig(ip.getOrElse(ConfigEntityName.Default),
+  CoreUtils.propsWith(DynamicConfig.Ip.IpConnectionRateOverrideProp, 
updatedRate.toString))
+// use a random throwaway address if ip isn't specified to get the default 
value
+TestUtils.waitUntilTrue(() => servers.head.socketServer.connectionQuotas.
+  connectionRateForIp(InetAddress.getByName(ip.getOrElse("255.255.3.4"))) 
== updatedRate,

Review comment:
   This is admittedly a little weird.
   If `None` is given as the IP, we want to update the default connection rate. 
   To verify that the default connection rate was updated, we need to call 
`connectionRateForIp` with some arbitrary IP address that hasn't been given a 
specific override. In this case, I used an arbitrary IP, `255.255.3.4` as 
mentioned in the comment.

##
File path: core/src/main/scala/kafka/network/SocketServer.scala
##
@@ -1246,7 +1337,57 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, 
metrics: Metrics) extend
   private[network] def updateBrokerMaxConnectionRate(maxConnectionRate: Int): 
Unit = {
 // if there is a connection waiting on the rate throttle delay, we will 
let it wait the original delay even if
 // the rate limit increases, because it is just one connection per 
listener and the code is simpler that way
-updateConnectionRateQuota(maxConnectionRate)
+updateConnectionRateQuota(maxConnectionRate, BrokerQuotaEntity)
+  }
+
+  /**
+   * Update the connection rate quota for a given IP and updates quota configs 
for updated IPs.
+   * If an IP is given, metric config will be updated only for the given IP, 
otherwise
+   * all metric configs will be checked and updated if required
+   *
+   * @param ip ip to update or default if None
+   * @param maxConnectionRate new connection rate, or resets entity to default 
if None
+   */
+  def updateIpConnectionRate(ip: Option[String], maxConnectionRate: 
Option[Int]): Unit = {
+def isIpConnectionRateMetric(metricName: MetricName) = {
+  metricName.name == ConnectionRateMetricName &&
+  metricName.group == MetricsGroup &&
+  metricName.tags.containsKey(IpMetricTag)
+}
+
+def shouldUpdateQuota(metric: KafkaMetric, quotaLimit: Int) = {
+  quotaLimit != metric.config.quota.bound
+}
+
+ip match {
+  case Some(addr) =>
+val address = InetAddress.getByName(addr)
+maxConnectionRate match {
+  case Some(rate) =>
+info(s"Updating max connection rate override for $address to 
$rate")
+connectionRatePerIp.put(address, rate)
+  case None =>
+info(s"Removing max connection rate override for $address")
+connectionRatePerIp.remove(address)
+}
+updateConnectionRateQuota(connectionRateForIp(address), 
IpQuotaEntity(address))
+  case None =>
+val newQuota = 
maxConnectionRate.getOrElse(DynamicConfig.Ip.DefaultConnectionCreationRate)
+info(s"Updating default max IP connection rate to $newQuota")
+defaultConnectionRatePerIp = newQuota
+val allMetrics = metrics.metrics
+allMetrics.forEach { (metricName, metric) =>
+  if (isIpConnectionRateMetric(metricName) && 
shouldUpdateQuota(metric, newQuota)) {
+info(s"Updating existing connection rate sensor for 
${metricName.tags} to $newQuota")
+metric.config(rateQuotaMetricConfig(newQuota))
+  }

Review comment:
   @dajac 
   good catch, we shouldn't be using newQuota here. I agree, this should have 
been covered by testing, I'll work on adding some more unit tests for the 
metric config updating.
   
   @apovzner 
   It should handle that case, yeah.
   If default is set to some value, when we remove quota for an ip, e.g. 
`updateConnectionRate(Some(ip), None)`, we remove the connection rate entry 
from the map and then call `getOrDefault(ip, defaultConnectionRatePerIp)`which 
should be whatever the non-unlimited per-IP default quota is.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org