Repository: kafka Updated Branches: refs/heads/trunk 272956f03 -> 84d2b6a01
MINOR: Simplify SensorAccess usage I was investigating an exception in this code and found a few opportunities for making it clearer. I also added the `out` folder to `.gitignore` as IntelliJ sometimes uses that as the build folder. Author: Ismael Juma <ism...@juma.me.uk> Reviewers: Rajini Sivaram <rajinisiva...@googlemail.com> Closes #3552 from ijuma/minor-quota-improvements Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/84d2b6a0 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/84d2b6a0 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/84d2b6a0 Branch: refs/heads/trunk Commit: 84d2b6a01c0fd71ee18fdf2364d85473d78c2b2f Parents: 272956f Author: Ismael Juma <ism...@juma.me.uk> Authored: Thu Jul 20 14:01:11 2017 +0100 Committer: Ismael Juma <ism...@juma.me.uk> Committed: Thu Jul 20 14:01:11 2017 +0100 ---------------------------------------------------------------------- .gitignore | 1 + .../scala/kafka/server/ClientQuotaManager.scala | 30 +++++++++----------- .../kafka/server/ReplicationQuotaManager.scala | 13 ++++----- .../main/scala/kafka/server/SensorAccess.scala | 21 ++++++-------- 4 files changed, 29 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/84d2b6a0/.gitignore ---------------------------------------------------------------------- diff --git a/.gitignore b/.gitignore index e8503ff..e12082e 100644 --- a/.gitignore +++ b/.gitignore @@ -3,6 +3,7 @@ dist target/ build/ build_eclipse/ +out/ .gradle/ lib_managed/ src_managed/ http://git-wip-us.apache.org/repos/asf/kafka/blob/84d2b6a0/core/src/main/scala/kafka/server/ClientQuotaManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/ClientQuotaManager.scala b/core/src/main/scala/kafka/server/ClientQuotaManager.scala index 3970a4b..e1d5249 100644 --- a/core/src/main/scala/kafka/server/ClientQuotaManager.scala +++ b/core/src/main/scala/kafka/server/ClientQuotaManager.scala @@ -138,10 +138,12 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, private val time: Time) extends Logging { private val overriddenQuota = new ConcurrentHashMap[QuotaId, Quota]() private val staticConfigClientIdQuota = Quota.upperBound(config.quotaBytesPerSecondDefault) - @volatile private var quotaTypesEnabled = if (config.quotaBytesPerSecondDefault == Long.MaxValue) QuotaTypes.NoQuotas else QuotaTypes.ClientIdQuotaEnabled + @volatile private var quotaTypesEnabled = + if (config.quotaBytesPerSecondDefault == Long.MaxValue) QuotaTypes.NoQuotas + else QuotaTypes.ClientIdQuotaEnabled private val lock = new ReentrantReadWriteLock() private val delayQueue = new DelayQueue[ThrottledResponse]() - private val sensorAccessor = new SensorAccess + private val sensorAccessor = new SensorAccess(lock, metrics) val throttledRequestReaper = new ThrottledRequestReaper(delayQueue) private val delayQueueSensor = metrics.sensor(quotaType + "-delayQueue") @@ -392,24 +394,19 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, sensorAccessor.getOrCreate( getQuotaSensorName(clientQuotaEntity.quotaId), ClientQuotaManagerConfig.InactiveSensorExpirationTimeSeconds, - lock, metrics, - () => clientRateMetricName(clientQuotaEntity.sanitizedUser, clientQuotaEntity.clientId), - () => getQuotaMetricConfig(clientQuotaEntity.quota), - () => measurableStat + clientRateMetricName(clientQuotaEntity.sanitizedUser, clientQuotaEntity.clientId), + Some(getQuotaMetricConfig(clientQuotaEntity.quota)), + new Rate ), sensorAccessor.getOrCreate(getThrottleTimeSensorName(clientQuotaEntity.quotaId), ClientQuotaManagerConfig.InactiveSensorExpirationTimeSeconds, - lock, - metrics, - () => throttleMetricName(clientQuotaEntity), - () => null, - () => new Avg() + throttleMetricName(clientQuotaEntity), + None, + new Avg ) ) } - private def measurableStat: MeasurableStat = new Rate() - private def getThrottleTimeSensorName(quotaId: QuotaId): String = quotaType + "ThrottleTime-" + quotaId.sanitizedUser.getOrElse("") + ':' + quotaId.clientId.getOrElse("") private def getQuotaSensorName(quotaId: QuotaId): String = quotaType + "-" + quotaId.sanitizedUser.getOrElse("") + ':' + quotaId.clientId.getOrElse("") @@ -425,10 +422,9 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, sensorAccessor.getOrCreate( sensorName, ClientQuotaManagerConfig.InactiveSensorExpirationTimeSeconds, - lock, metrics, - () => metricName, - () => null, - () => measurableStat + metricName, + None, + new Rate ) } http://git-wip-us.apache.org/repos/asf/kafka/blob/84d2b6a0/core/src/main/scala/kafka/server/ReplicationQuotaManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/ReplicationQuotaManager.scala b/core/src/main/scala/kafka/server/ReplicationQuotaManager.scala index 4a87dfb..84004e3 100644 --- a/core/src/main/scala/kafka/server/ReplicationQuotaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicationQuotaManager.scala @@ -74,8 +74,9 @@ class ReplicationQuotaManager(val config: ReplicationQuotaManagerConfig, private val lock = new ReentrantReadWriteLock() private val throttledPartitions = new ConcurrentHashMap[String, Seq[Int]]() private var quota: Quota = null - private val sensorAccess = new SensorAccess - private val rateMetricName = metrics.metricName("byte-rate", replicationType.toString, s"Tracking byte-rate for ${replicationType}") + private val sensorAccess = new SensorAccess(lock, metrics) + private val rateMetricName = metrics.metricName("byte-rate", replicationType.toString, + s"Tracking byte-rate for ${replicationType}") /** * Update the quota @@ -194,11 +195,9 @@ class ReplicationQuotaManager(val config: ReplicationQuotaManagerConfig, sensorAccess.getOrCreate( replicationType.toString, InactiveSensorExpirationTimeSeconds, - lock, - metrics, - () => rateMetricName, - () => getQuotaMetricConfig(quota), - () => new SimpleRate() + rateMetricName, + Some(getQuotaMetricConfig(quota)), + new SimpleRate ) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/84d2b6a0/core/src/main/scala/kafka/server/SensorAccess.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/SensorAccess.scala b/core/src/main/scala/kafka/server/SensorAccess.scala index 3a0130b..49e1fc0 100644 --- a/core/src/main/scala/kafka/server/SensorAccess.scala +++ b/core/src/main/scala/kafka/server/SensorAccess.scala @@ -16,10 +16,10 @@ */ package kafka.server -import java.util.concurrent.locks.ReentrantReadWriteLock +import java.util.concurrent.locks.ReadWriteLock import org.apache.kafka.common.MetricName -import org.apache.kafka.common.metrics.{Metrics, Sensor, MeasurableStat, MetricConfig} +import org.apache.kafka.common.metrics.{MeasurableStat, MetricConfig, Metrics, Sensor} /** * Class which centralises the logic for creating/accessing sensors. @@ -27,9 +27,10 @@ import org.apache.kafka.common.metrics.{Metrics, Sensor, MeasurableStat, MetricC * * The later arguments are passed as methods as they are only called when the sensor is instantiated. */ -class SensorAccess { +class SensorAccess(lock: ReadWriteLock, metrics: Metrics) { - def getOrCreate(sensorName: String, expirationTime: Long, lock: ReentrantReadWriteLock, metrics: Metrics, metricName: () => MetricName, config: () => MetricConfig, measure: () => MeasurableStat): Sensor = { + def getOrCreate(sensorName: String, expirationTime: Long, + metricName: => MetricName, config: => Option[MetricConfig], measure: => MeasurableStat): Sensor = { var sensor: Sensor = null /* Acquire the read lock to fetch the sensor. It is safe to call getSensor from multiple threads. @@ -41,12 +42,8 @@ class SensorAccess { * at which point it is safe to read */ lock.readLock().lock() - try { - sensor = metrics.getSensor(sensorName) - } - finally { - lock.readLock().unlock() - } + try sensor = metrics.getSensor(sensorName) + finally lock.readLock().unlock() /* If the sensor is null, try to create it else return the existing sensor * The sensor can be null, hence the null checks @@ -64,8 +61,8 @@ class SensorAccess { // ensure that we initialise `ClientSensors` with non-null parameters. sensor = metrics.getSensor(sensorName) if (sensor == null) { - sensor = metrics.sensor(sensorName, config(), expirationTime) - sensor.add(metricName(), measure()) + sensor = metrics.sensor(sensorName, config.orNull, expirationTime) + sensor.add(metricName, measure) } } finally { lock.writeLock().unlock()