This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch 3.4 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 984c5eda7c0707577e26890b7b6018f86d78194e Author: Ron Dagostino <[email protected]> AuthorDate: Fri Dec 9 20:00:19 2022 -0500 KAFKA-14392: Fix overly long request timeouts in BrokerToControllerChannelManager (#12856) In BrokerToControllerChannelManager, set the request timeout to the minimum of the retry timeout and the controller socket timeout. This fixes some cases where we were unintentionally setting an overly long request timeout. Also, the channel manager used by the BrokerLifecycleManager should set a retry timeout equal to half of the broker session timeout, rather than the entire broker session timeout, to allow for a retransmission if the initial attempt fails. These two fixes should address some cases where heartbeat broker requests were not being resent in a timely fashion after a network glitch. Reviewers: Colin P. McCabe <[email protected]>, José Armando García Sancio <[email protected]> --- core/src/main/scala/kafka/server/BrokerServer.scala | 2 +- .../main/scala/kafka/server/BrokerToControllerChannelManager.scala | 4 ++-- core/src/main/scala/kafka/server/ControllerApis.scala | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index f55ceebffcc..a83da6e0258 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -338,7 +338,7 @@ class BrokerServer( config, "heartbeat", threadNamePrefix, - config.brokerSessionTimeoutMs.toLong + config.brokerSessionTimeoutMs / 2 // KAFKA-14392 ) lifecycleManager.start( () => metadataListener.highestMetadataOffset, diff --git a/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala b/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala index 99e86722f2f..92754a793f5 100644 --- a/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala +++ b/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala @@ -211,7 +211,7 @@ class BrokerToControllerChannelManagerImpl( 50, Selectable.USE_DEFAULT_BUFFER_SIZE, Selectable.USE_DEFAULT_BUFFER_SIZE, - config.requestTimeoutMs, + Math.min(Int.MaxValue, Math.min(config.controllerSocketTimeoutMs, retryTimeoutMs)).toInt, // request timeout should not exceed the provided retry timeout config.connectionSetupTimeoutMs, config.connectionSetupTimeoutMaxMs, time, @@ -283,7 +283,7 @@ class BrokerToControllerRequestThread( time: Time, threadName: String, retryTimeoutMs: Long -) extends InterBrokerSendThread(threadName, networkClient, config.controllerSocketTimeoutMs, time, isInterruptible = false) { +) extends InterBrokerSendThread(threadName, networkClient, Math.min(Int.MaxValue, Math.min(config.controllerSocketTimeoutMs, retryTimeoutMs)).toInt, time, isInterruptible = false) { private val requestQueue = new LinkedBlockingDeque[BrokerToControllerQueueItem]() private val activeController = new AtomicReference[Node](null) diff --git a/core/src/main/scala/kafka/server/ControllerApis.scala b/core/src/main/scala/kafka/server/ControllerApis.scala index 657c2965533..1c3586263a7 100644 --- a/core/src/main/scala/kafka/server/ControllerApis.scala +++ b/core/src/main/scala/kafka/server/ControllerApis.scala @@ -576,7 +576,7 @@ class ControllerApis(val requestChannel: RequestChannel, val heartbeatRequest = request.body[BrokerHeartbeatRequest] authHelper.authorizeClusterOperation(request, CLUSTER_ACTION) val context = new ControllerRequestContext(request.context.header.data, request.context.principal, - requestTimeoutMsToDeadlineNs(time, config.brokerHeartbeatIntervalMs)) + requestTimeoutMsToDeadlineNs(time, config.brokerHeartbeatIntervalMs / 2)) controller.processBrokerHeartbeat(context, heartbeatRequest.data).handle[Unit] { (reply, e) => def createResponseCallback(requestThrottleMs: Int, reply: BrokerHeartbeatReply,
