This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch 3.3 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.3 by this push: new e2136746159 KAFKA-14225; Fix deadlock caused by lazy val exemptSensor (#12634) e2136746159 is described below commit e2136746159915c3cf66df0705bac65ebc6a2aa6 Author: Huilin Shi <h...@linkedin.com> AuthorDate: Tue Oct 11 10:30:07 2022 -0700 KAFKA-14225; Fix deadlock caused by lazy val exemptSensor (#12634) There is a chance to cause deadlock when multiple threads access ClientRequestQuotaManager. In the version Scala 2.12, the lazy val initialization is under the object lock. The deadlock could happen in the following condition: In thread a, when ClientRequestQuotaManager.exemptSensor is being initialized, it has acquired the object lock and enters the the actual initialization block. The initialization of 'exemptSensor' requires another lock private val lock = new ReentrantReadWriteLock() and it is waiting for this lock. In thread b, at the same time, ClientQuotaManager.updateQuota() is called and it has already acquired ReentrantReadWriteLock lock by calling lock.writeLock().lock(). And then it executes info(). If this is the first time accessing Logging.logger, which is also a lazy val, it need to wait for the object lock. The deadlock happens. Since the lazy val initialization is under the object lock, we should avoid using lazy val if the initialization function holds another lock to prevent holding two locks at the same time which is prone for deadlock. Change to create exemptSensor during ClientRequestQuotaManager initialization with an expiration time of Long.MaxValue to prevent expiration if request quota is not enabled at that time. Reviewers: Jason Gustafson <ja...@confluent.io> --- core/src/main/scala/kafka/server/ClientQuotaManager.scala | 7 +++---- core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala | 9 +++++++-- core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala | 3 ++- .../test/scala/integration/kafka/api/PlaintextConsumerTest.scala | 7 ------- 4 files changed, 12 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/kafka/server/ClientQuotaManager.scala b/core/src/main/scala/kafka/server/ClientQuotaManager.scala index ee7c70bec93..45c7c31570f 100644 --- a/core/src/main/scala/kafka/server/ClientQuotaManager.scala +++ b/core/src/main/scala/kafka/server/ClientQuotaManager.scala @@ -433,12 +433,11 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, .quota(new Quota(quotaLimit, true)) } - protected def getOrCreateSensor(sensorName: String, metricName: MetricName): Sensor = { + protected def getOrCreateSensor(sensorName: String, expirationTimeSeconds: Long, registerMetrics: Sensor => Unit): Sensor = { sensorAccessor.getOrCreate( sensorName, - ClientQuotaManager.InactiveSensorExpirationTimeSeconds, - sensor => sensor.add(metricName, new Rate) - ) + expirationTimeSeconds, + registerMetrics) } /** diff --git a/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala b/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala index 2ceaab9c9af..6e57d97bc3e 100644 --- a/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala +++ b/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala @@ -17,11 +17,11 @@ package kafka.server import java.util.concurrent.TimeUnit - import kafka.network.RequestChannel import kafka.utils.QuotaUtils import org.apache.kafka.common.MetricName import org.apache.kafka.common.metrics._ +import org.apache.kafka.common.metrics.stats.Rate import org.apache.kafka.common.utils.Time import org.apache.kafka.server.quota.ClientQuotaCallback @@ -30,6 +30,9 @@ import scala.jdk.CollectionConverters._ object ClientRequestQuotaManager { val QuotaRequestPercentDefault = Int.MaxValue.toDouble val NanosToPercentagePerSecond = 100.0 / TimeUnit.SECONDS.toNanos(1) + // Since exemptSensor is for all clients and has a constant name, we do not expire exemptSensor and only + // create once. + val DefaultInactiveExemptSensorExpirationTimeSeconds = Long.MaxValue private val ExemptSensorName = "exempt-" + QuotaType.Request } @@ -45,7 +48,9 @@ class ClientRequestQuotaManager(private val config: ClientQuotaManagerConfig, private val exemptMetricName = metrics.metricName("exempt-request-time", QuotaType.Request.toString, "Tracking exempt-request-time utilization percentage") - lazy val exemptSensor: Sensor = getOrCreateSensor(ClientRequestQuotaManager.ExemptSensorName, exemptMetricName) + val exemptSensor: Sensor = getOrCreateSensor(ClientRequestQuotaManager.ExemptSensorName, + ClientRequestQuotaManager.DefaultInactiveExemptSensorExpirationTimeSeconds, + sensor => sensor.add(exemptMetricName, new Rate)) def recordExempt(value: Double): Unit = { exemptSensor.record(value) diff --git a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala index 40d4cef7f82..457a1ddd48d 100644 --- a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala +++ b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala @@ -173,7 +173,8 @@ abstract class BaseQuotaTest extends IntegrationTestHarness { consumer.subscribe(Collections.singleton(topic1)) val endTimeMs = System.currentTimeMillis + 10000 var throttled = false - while ((!throttled || quotaTestClients.exemptRequestMetric == null) && System.currentTimeMillis < endTimeMs) { + while ((!throttled || quotaTestClients.exemptRequestMetric == null || metricValue(quotaTestClients.exemptRequestMetric) <= 0) + && System.currentTimeMillis < endTimeMs) { consumer.poll(Duration.ofMillis(100L)) val throttleMetric = quotaTestClients.throttleMetric(QuotaType.Request, consumerClientId) throttled = throttleMetric != null && metricValue(throttleMetric) > 0 diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala index 5dc7c2ada1e..92670dba3b3 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala @@ -637,7 +637,6 @@ class PlaintextConsumerTest extends BaseConsumerTest { assertEquals(0L, consumer.position(tp), "position() on a partition that we are subscribed to should reset the offset") consumer.commitSync() assertEquals(0L, consumer.committed(Set(tp).asJava).get(tp).offset) - consumeAndVerifyRecords(consumer = consumer, numRecords = 5, startingOffset = 0, startingTimestamp = startingTimestamp) assertEquals(5L, consumer.position(tp), "After consuming 5 records, position should be 5") consumer.commitSync() @@ -1642,12 +1641,6 @@ class PlaintextConsumerTest extends BaseConsumerTest { servers.foreach(assertNoMetric(_, "throttle-time", QuotaType.Request, producerClientId)) servers.foreach(assertNoMetric(_, "request-time", QuotaType.Request, consumerClientId)) servers.foreach(assertNoMetric(_, "throttle-time", QuotaType.Request, consumerClientId)) - - def assertNoExemptRequestMetric(broker: KafkaServer): Unit = { - val metricName = broker.metrics.metricName("exempt-request-time", QuotaType.Request.toString, "") - assertNull(broker.metrics.metric(metricName), "Metric should not have been created " + metricName) - } - servers.foreach(assertNoExemptRequestMetric) } def runMultiConsumerSessionTimeoutTest(closeConsumer: Boolean): Unit = {