Repository: kafka
Updated Branches:
  refs/heads/trunk 272956f03 -> 84d2b6a01


MINOR: Simplify SensorAccess usage

I was investigating an exception in this code and found a few
opportunities for making it clearer.

I also added the `out` folder to `.gitignore` as IntelliJ sometimes
uses that as the build folder.

Author: Ismael Juma <ism...@juma.me.uk>

Reviewers: Rajini Sivaram <rajinisiva...@googlemail.com>

Closes #3552 from ijuma/minor-quota-improvements


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/84d2b6a0
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/84d2b6a0
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/84d2b6a0

Branch: refs/heads/trunk
Commit: 84d2b6a01c0fd71ee18fdf2364d85473d78c2b2f
Parents: 272956f
Author: Ismael Juma <ism...@juma.me.uk>
Authored: Thu Jul 20 14:01:11 2017 +0100
Committer: Ismael Juma <ism...@juma.me.uk>
Committed: Thu Jul 20 14:01:11 2017 +0100

----------------------------------------------------------------------
 .gitignore                                      |  1 +
 .../scala/kafka/server/ClientQuotaManager.scala | 30 +++++++++-----------
 .../kafka/server/ReplicationQuotaManager.scala  | 13 ++++-----
 .../main/scala/kafka/server/SensorAccess.scala  | 21 ++++++--------
 4 files changed, 29 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/84d2b6a0/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index e8503ff..e12082e 100644
--- a/.gitignore
+++ b/.gitignore
@@ -3,6 +3,7 @@ dist
 target/
 build/
 build_eclipse/
+out/
 .gradle/
 lib_managed/
 src_managed/

http://git-wip-us.apache.org/repos/asf/kafka/blob/84d2b6a0/core/src/main/scala/kafka/server/ClientQuotaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ClientQuotaManager.scala 
b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
index 3970a4b..e1d5249 100644
--- a/core/src/main/scala/kafka/server/ClientQuotaManager.scala
+++ b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
@@ -138,10 +138,12 @@ class ClientQuotaManager(private val config: 
ClientQuotaManagerConfig,
                          private val time: Time) extends Logging {
   private val overriddenQuota = new ConcurrentHashMap[QuotaId, Quota]()
   private val staticConfigClientIdQuota = 
Quota.upperBound(config.quotaBytesPerSecondDefault)
-  @volatile private var quotaTypesEnabled = if 
(config.quotaBytesPerSecondDefault == Long.MaxValue) QuotaTypes.NoQuotas else 
QuotaTypes.ClientIdQuotaEnabled
+  @volatile private var quotaTypesEnabled =
+    if (config.quotaBytesPerSecondDefault == Long.MaxValue) QuotaTypes.NoQuotas
+    else QuotaTypes.ClientIdQuotaEnabled
   private val lock = new ReentrantReadWriteLock()
   private val delayQueue = new DelayQueue[ThrottledResponse]()
-  private val sensorAccessor = new SensorAccess
+  private val sensorAccessor = new SensorAccess(lock, metrics)
   val throttledRequestReaper = new ThrottledRequestReaper(delayQueue)
 
   private val delayQueueSensor = metrics.sensor(quotaType + "-delayQueue")
@@ -392,24 +394,19 @@ class ClientQuotaManager(private val config: 
ClientQuotaManagerConfig,
       sensorAccessor.getOrCreate(
         getQuotaSensorName(clientQuotaEntity.quotaId),
         ClientQuotaManagerConfig.InactiveSensorExpirationTimeSeconds,
-        lock, metrics,
-        () => clientRateMetricName(clientQuotaEntity.sanitizedUser, 
clientQuotaEntity.clientId),
-        () => getQuotaMetricConfig(clientQuotaEntity.quota),
-        () => measurableStat
+        clientRateMetricName(clientQuotaEntity.sanitizedUser, 
clientQuotaEntity.clientId),
+        Some(getQuotaMetricConfig(clientQuotaEntity.quota)),
+        new Rate
       ),
       
sensorAccessor.getOrCreate(getThrottleTimeSensorName(clientQuotaEntity.quotaId),
         ClientQuotaManagerConfig.InactiveSensorExpirationTimeSeconds,
-        lock,
-        metrics,
-        () => throttleMetricName(clientQuotaEntity),
-        () => null,
-        () => new Avg()
+        throttleMetricName(clientQuotaEntity),
+        None,
+        new Avg
       )
     )
   }
 
-  private def measurableStat: MeasurableStat = new Rate()
-
   private def getThrottleTimeSensorName(quotaId: QuotaId): String = quotaType 
+ "ThrottleTime-" + quotaId.sanitizedUser.getOrElse("") + ':' + 
quotaId.clientId.getOrElse("")
 
   private def getQuotaSensorName(quotaId: QuotaId): String = quotaType + "-" + 
quotaId.sanitizedUser.getOrElse("") + ':' + quotaId.clientId.getOrElse("")
@@ -425,10 +422,9 @@ class ClientQuotaManager(private val config: 
ClientQuotaManagerConfig,
     sensorAccessor.getOrCreate(
         sensorName,
         ClientQuotaManagerConfig.InactiveSensorExpirationTimeSeconds,
-        lock, metrics,
-        () => metricName,
-        () => null,
-        () => measurableStat
+        metricName,
+        None,
+        new Rate
       )
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/84d2b6a0/core/src/main/scala/kafka/server/ReplicationQuotaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicationQuotaManager.scala 
b/core/src/main/scala/kafka/server/ReplicationQuotaManager.scala
index 4a87dfb..84004e3 100644
--- a/core/src/main/scala/kafka/server/ReplicationQuotaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicationQuotaManager.scala
@@ -74,8 +74,9 @@ class ReplicationQuotaManager(val config: 
ReplicationQuotaManagerConfig,
   private val lock = new ReentrantReadWriteLock()
   private val throttledPartitions = new ConcurrentHashMap[String, Seq[Int]]()
   private var quota: Quota = null
-  private val sensorAccess = new SensorAccess
-  private val rateMetricName = metrics.metricName("byte-rate", 
replicationType.toString, s"Tracking byte-rate for ${replicationType}")
+  private val sensorAccess = new SensorAccess(lock, metrics)
+  private val rateMetricName = metrics.metricName("byte-rate", 
replicationType.toString,
+    s"Tracking byte-rate for ${replicationType}")
 
   /**
     * Update the quota
@@ -194,11 +195,9 @@ class ReplicationQuotaManager(val config: 
ReplicationQuotaManagerConfig,
     sensorAccess.getOrCreate(
       replicationType.toString,
       InactiveSensorExpirationTimeSeconds,
-      lock,
-      metrics,
-      () => rateMetricName,
-      () => getQuotaMetricConfig(quota),
-      () => new SimpleRate()
+      rateMetricName,
+      Some(getQuotaMetricConfig(quota)),
+      new SimpleRate
     )
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/84d2b6a0/core/src/main/scala/kafka/server/SensorAccess.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/SensorAccess.scala 
b/core/src/main/scala/kafka/server/SensorAccess.scala
index 3a0130b..49e1fc0 100644
--- a/core/src/main/scala/kafka/server/SensorAccess.scala
+++ b/core/src/main/scala/kafka/server/SensorAccess.scala
@@ -16,10 +16,10 @@
   */
 package kafka.server
 
-import java.util.concurrent.locks.ReentrantReadWriteLock
+import java.util.concurrent.locks.ReadWriteLock
 
 import org.apache.kafka.common.MetricName
-import org.apache.kafka.common.metrics.{Metrics, Sensor, MeasurableStat, 
MetricConfig}
+import org.apache.kafka.common.metrics.{MeasurableStat, MetricConfig, Metrics, 
Sensor}
 
 /**
   * Class which centralises the logic for creating/accessing sensors.
@@ -27,9 +27,10 @@ import org.apache.kafka.common.metrics.{Metrics, Sensor, 
MeasurableStat, MetricC
   *
   * The later arguments are passed as methods as they are only called when the 
sensor is instantiated.
   */
-class SensorAccess {
+class SensorAccess(lock: ReadWriteLock, metrics: Metrics) {
 
-  def getOrCreate(sensorName: String, expirationTime: Long, lock: 
ReentrantReadWriteLock, metrics: Metrics, metricName: () => MetricName, config: 
() => MetricConfig, measure: () => MeasurableStat): Sensor = {
+  def getOrCreate(sensorName: String, expirationTime: Long,
+                  metricName: => MetricName, config: => Option[MetricConfig], 
measure: => MeasurableStat): Sensor = {
     var sensor: Sensor = null
 
     /* Acquire the read lock to fetch the sensor. It is safe to call getSensor 
from multiple threads.
@@ -41,12 +42,8 @@ class SensorAccess {
      * at which point it is safe to read
      */
     lock.readLock().lock()
-    try {
-      sensor = metrics.getSensor(sensorName)
-    }
-    finally {
-      lock.readLock().unlock()
-    }
+    try sensor = metrics.getSensor(sensorName)
+    finally lock.readLock().unlock()
 
     /* If the sensor is null, try to create it else return the existing sensor
      * The sensor can be null, hence the null checks
@@ -64,8 +61,8 @@ class SensorAccess {
         // ensure that we initialise `ClientSensors` with non-null parameters.
         sensor = metrics.getSensor(sensorName)
         if (sensor == null) {
-          sensor = metrics.sensor(sensorName, config(), expirationTime)
-          sensor.add(metricName(), measure())
+          sensor = metrics.sensor(sensorName, config.orNull, expirationTime)
+          sensor.add(metricName, measure)
         }
       } finally {
         lock.writeLock().unlock()

Reply via email to