This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.3 by this push:
     new e2136746159 KAFKA-14225; Fix deadlock caused by lazy val exemptSensor 
(#12634)
e2136746159 is described below

commit e2136746159915c3cf66df0705bac65ebc6a2aa6
Author: Huilin Shi <h...@linkedin.com>
AuthorDate: Tue Oct 11 10:30:07 2022 -0700

    KAFKA-14225; Fix deadlock caused by lazy val exemptSensor (#12634)
    
    There is a chance to cause deadlock when multiple threads access 
ClientRequestQuotaManager. In the version Scala 2.12, the lazy val 
initialization is under the object lock. The deadlock could happen in the 
following condition:
    
    In thread a, when ClientRequestQuotaManager.exemptSensor is being 
initialized, it has acquired the object lock and enters the the actual 
initialization block. The initialization of 'exemptSensor' requires another 
lock private val lock = new ReentrantReadWriteLock() and it is waiting for this 
lock.
    
    In thread b, at the same time, ClientQuotaManager.updateQuota() is called 
and it has already acquired ReentrantReadWriteLock lock by calling 
lock.writeLock().lock(). And then it executes info(). If this is the first time 
accessing Logging.logger, which is also a lazy val, it need to wait for the 
object lock.
    
    The deadlock happens.
    
    Since the lazy val initialization is under the object lock, we should avoid 
using lazy val if the initialization function holds another lock to prevent 
holding two locks at the same time which is prone for deadlock. Change to 
create exemptSensor during ClientRequestQuotaManager initialization with an 
expiration time of Long.MaxValue to prevent expiration if request quota is not 
enabled at that time.
    
    Reviewers: Jason Gustafson <ja...@confluent.io>
---
 core/src/main/scala/kafka/server/ClientQuotaManager.scala        | 7 +++----
 core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala | 9 +++++++--
 core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala    | 3 ++-
 .../test/scala/integration/kafka/api/PlaintextConsumerTest.scala | 7 -------
 4 files changed, 12 insertions(+), 14 deletions(-)

diff --git a/core/src/main/scala/kafka/server/ClientQuotaManager.scala 
b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
index ee7c70bec93..45c7c31570f 100644
--- a/core/src/main/scala/kafka/server/ClientQuotaManager.scala
+++ b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
@@ -433,12 +433,11 @@ class ClientQuotaManager(private val config: 
ClientQuotaManagerConfig,
       .quota(new Quota(quotaLimit, true))
   }
 
-  protected def getOrCreateSensor(sensorName: String, metricName: MetricName): 
Sensor = {
+  protected def getOrCreateSensor(sensorName: String, expirationTimeSeconds: 
Long, registerMetrics: Sensor => Unit): Sensor = {
     sensorAccessor.getOrCreate(
       sensorName,
-      ClientQuotaManager.InactiveSensorExpirationTimeSeconds,
-      sensor => sensor.add(metricName, new Rate)
-    )
+      expirationTimeSeconds,
+      registerMetrics)
   }
 
   /**
diff --git a/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala 
b/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala
index 2ceaab9c9af..6e57d97bc3e 100644
--- a/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala
+++ b/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala
@@ -17,11 +17,11 @@
 package kafka.server
 
 import java.util.concurrent.TimeUnit
-
 import kafka.network.RequestChannel
 import kafka.utils.QuotaUtils
 import org.apache.kafka.common.MetricName
 import org.apache.kafka.common.metrics._
+import org.apache.kafka.common.metrics.stats.Rate
 import org.apache.kafka.common.utils.Time
 import org.apache.kafka.server.quota.ClientQuotaCallback
 
@@ -30,6 +30,9 @@ import scala.jdk.CollectionConverters._
 object ClientRequestQuotaManager {
   val QuotaRequestPercentDefault = Int.MaxValue.toDouble
   val NanosToPercentagePerSecond = 100.0 / TimeUnit.SECONDS.toNanos(1)
+  // Since exemptSensor is for all clients and has a constant name, we do not 
expire exemptSensor and only
+  // create once.
+  val DefaultInactiveExemptSensorExpirationTimeSeconds = Long.MaxValue
 
   private val ExemptSensorName = "exempt-" + QuotaType.Request
 }
@@ -45,7 +48,9 @@ class ClientRequestQuotaManager(private val config: 
ClientQuotaManagerConfig,
   private val exemptMetricName = metrics.metricName("exempt-request-time",
     QuotaType.Request.toString, "Tracking exempt-request-time utilization 
percentage")
 
-  lazy val exemptSensor: Sensor = 
getOrCreateSensor(ClientRequestQuotaManager.ExemptSensorName, exemptMetricName)
+  val exemptSensor: Sensor = 
getOrCreateSensor(ClientRequestQuotaManager.ExemptSensorName,
+    ClientRequestQuotaManager.DefaultInactiveExemptSensorExpirationTimeSeconds,
+    sensor => sensor.add(exemptMetricName, new Rate))
 
   def recordExempt(value: Double): Unit = {
     exemptSensor.record(value)
diff --git a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala 
b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
index 40d4cef7f82..457a1ddd48d 100644
--- a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
@@ -173,7 +173,8 @@ abstract class BaseQuotaTest extends IntegrationTestHarness 
{
     consumer.subscribe(Collections.singleton(topic1))
     val endTimeMs = System.currentTimeMillis + 10000
     var throttled = false
-    while ((!throttled || quotaTestClients.exemptRequestMetric == null) && 
System.currentTimeMillis < endTimeMs) {
+    while ((!throttled || quotaTestClients.exemptRequestMetric == null || 
metricValue(quotaTestClients.exemptRequestMetric) <= 0)
+      && System.currentTimeMillis < endTimeMs) {
       consumer.poll(Duration.ofMillis(100L))
       val throttleMetric = quotaTestClients.throttleMetric(QuotaType.Request, 
consumerClientId)
       throttled = throttleMetric != null && metricValue(throttleMetric) > 0
diff --git 
a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala 
b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index 5dc7c2ada1e..92670dba3b3 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -637,7 +637,6 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     assertEquals(0L, consumer.position(tp), "position() on a partition that we 
are subscribed to should reset the offset")
     consumer.commitSync()
     assertEquals(0L, consumer.committed(Set(tp).asJava).get(tp).offset)
-    
     consumeAndVerifyRecords(consumer = consumer, numRecords = 5, 
startingOffset = 0, startingTimestamp = startingTimestamp)
     assertEquals(5L, consumer.position(tp), "After consuming 5 records, 
position should be 5")
     consumer.commitSync()
@@ -1642,12 +1641,6 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     servers.foreach(assertNoMetric(_, "throttle-time", QuotaType.Request, 
producerClientId))
     servers.foreach(assertNoMetric(_, "request-time", QuotaType.Request, 
consumerClientId))
     servers.foreach(assertNoMetric(_, "throttle-time", QuotaType.Request, 
consumerClientId))
-
-    def assertNoExemptRequestMetric(broker: KafkaServer): Unit = {
-        val metricName = broker.metrics.metricName("exempt-request-time", 
QuotaType.Request.toString, "")
-        assertNull(broker.metrics.metric(metricName), "Metric should not have 
been created " + metricName)
-    }
-    servers.foreach(assertNoExemptRequestMetric)
   }
 
   def runMultiConsumerSessionTimeoutTest(closeConsumer: Boolean): Unit = {

Reply via email to