This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch 3.9
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.9 by this push:
new 02c102ff457 KAFKA-19294: Fix BrokerLifecycleManager RPC timeouts
(#19745)
02c102ff457 is described below
commit 02c102ff45734c043cde0a813c755d617c758f11
Author: Colin Patrick McCabe <[email protected]>
AuthorDate: Tue Jun 24 16:23:25 2025 -0700
KAFKA-19294: Fix BrokerLifecycleManager RPC timeouts (#19745)
Previously, we could wait for up to half of the broker session timeout
for an RPC to complete, and then delay by up to half of the broker
session timeout. When taken together, these two delays could lead to
brokers erroneously missing heartbeats.
This change removes exponential backoff for heartbeats sent from the
broker to the controller. The load caused by heartbeats is not heavy,
and controllers can easily time out heartbeats when the queue length is
too long. Additionally, we now set the maximum RPC time to the length of
the broker period. This minimizes the impact of heavy load.
Reviewers: José Armando García Sancio <[email protected]>, David Arthur
<[email protected]>
Conflicts:
- BrokerLifecycleManager.scala: fix minor conflict when removing
ExponentialBackoff object
---
.../scala/kafka/server/BrokerLifecycleManager.scala | 21 +++------------------
core/src/main/scala/kafka/server/BrokerServer.scala | 2 +-
2 files changed, 4 insertions(+), 19 deletions(-)
diff --git a/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala
b/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala
index 51bc16fb09d..6670cccc37c 100644
--- a/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala
+++ b/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala
@@ -28,7 +28,7 @@ import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{BrokerHeartbeatRequest,
BrokerHeartbeatResponse, BrokerRegistrationRequest, BrokerRegistrationResponse}
import org.apache.kafka.metadata.{BrokerState, VersionRange}
import org.apache.kafka.queue.EventQueue.DeadlineFunction
-import org.apache.kafka.common.utils.{ExponentialBackoff, LogContext, Time}
+import org.apache.kafka.common.utils.{LogContext, Time}
import org.apache.kafka.queue.{EventQueue, KafkaEventQueue}
import org.apache.kafka.server.{ControllerRequestCompletionHandler,
NodeToControllerChannelManager}
@@ -93,18 +93,6 @@ class BrokerLifecycleManager(
private val initialTimeoutNs =
MILLISECONDS.toNanos(config.initialRegistrationTimeoutMs.longValue())
- /**
- * The exponential backoff to use for resending communication.
- */
- private val resendExponentialBackoff =
- new ExponentialBackoff(100, 2, config.brokerSessionTimeoutMs.toLong, 0.02)
-
- /**
- * The number of times we've tried and failed to communicate. This variable
can only be
- * read or written from the BrokerToControllerRequestThread.
- */
- private var failedAttempts = 0L
-
/**
* The broker incarnation ID. This ID uniquely identifies each time we
start the broker
*/
@@ -449,7 +437,6 @@ class BrokerLifecycleManager(
val message =
response.responseBody().asInstanceOf[BrokerRegistrationResponse]
val errorCode = Errors.forCode(message.data().errorCode())
if (errorCode == Errors.NONE) {
- failedAttempts = 0
_brokerEpoch = message.data().brokerEpoch()
registered = true
initialRegistrationSucceeded = true
@@ -523,7 +510,6 @@ class BrokerLifecycleManager(
val errorCode = Errors.forCode(message.data().errorCode())
if (errorCode == Errors.NONE) {
val responseData = message.data()
- failedAttempts = 0
currentOfflineDirs.foreach(cur => offlineDirs.put(cur, true))
_state match {
case BrokerState.STARTING =>
@@ -586,10 +572,9 @@ class BrokerLifecycleManager(
}
private def scheduleNextCommunicationAfterFailure(): Unit = {
- val delayMs = resendExponentialBackoff.backoff(failedAttempts)
- failedAttempts = failedAttempts + 1
nextSchedulingShouldBeImmediate = false // never immediately reschedule
after a failure
- scheduleNextCommunication(NANOSECONDS.convert(delayMs, MILLISECONDS))
+ scheduleNextCommunication(NANOSECONDS.convert(
+ config.brokerHeartbeatIntervalMs.longValue() , MILLISECONDS))
}
private def scheduleNextCommunicationAfterSuccess(): Unit = {
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 49b4fc86998..738884f58b5 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -365,7 +365,7 @@ class BrokerServer(
config,
"heartbeat",
s"broker-${config.nodeId}-",
- config.brokerSessionTimeoutMs / 2 // KAFKA-14392
+ config.brokerHeartbeatIntervalMs
)
lifecycleManager.start(
() => sharedServer.loader.lastAppliedOffset(),