This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch 3.4
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 984c5eda7c0707577e26890b7b6018f86d78194e
Author: Ron Dagostino <[email protected]>
AuthorDate: Fri Dec 9 20:00:19 2022 -0500

    KAFKA-14392: Fix overly long request timeouts in 
BrokerToControllerChannelManager (#12856)
    
    In BrokerToControllerChannelManager, set the request timeout to the minimum 
of the retry timeout
    and the controller socket timeout. This fixes some cases where we were 
unintentionally setting an
    overly long request timeout.
    
    Also, the channel manager used by the BrokerLifecycleManager should set a 
retry timeout equal to
    half of the broker session timeout, rather than the entire broker session 
timeout, to allow for a
    retransmission if the initial attempt fails.
    
    These two fixes should address some cases where heartbeat broker requests 
were not being resent
    in a timely fashion after a network glitch.
    
    Reviewers: Colin P. McCabe <[email protected]>, José Armando García Sancio 
<[email protected]>
---
 core/src/main/scala/kafka/server/BrokerServer.scala                   | 2 +-
 .../main/scala/kafka/server/BrokerToControllerChannelManager.scala    | 4 ++--
 core/src/main/scala/kafka/server/ControllerApis.scala                 | 2 +-
 3 files changed, 4 insertions(+), 4 deletions(-)

diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala 
b/core/src/main/scala/kafka/server/BrokerServer.scala
index f55ceebffcc..a83da6e0258 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -338,7 +338,7 @@ class BrokerServer(
         config,
         "heartbeat",
         threadNamePrefix,
-        config.brokerSessionTimeoutMs.toLong
+        config.brokerSessionTimeoutMs / 2 // KAFKA-14392
       )
       lifecycleManager.start(
         () => metadataListener.highestMetadataOffset,
diff --git 
a/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala 
b/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
index 99e86722f2f..92754a793f5 100644
--- a/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
@@ -211,7 +211,7 @@ class BrokerToControllerChannelManagerImpl(
         50,
         Selectable.USE_DEFAULT_BUFFER_SIZE,
         Selectable.USE_DEFAULT_BUFFER_SIZE,
-        config.requestTimeoutMs,
+        Math.min(Int.MaxValue, Math.min(config.controllerSocketTimeoutMs, 
retryTimeoutMs)).toInt, // request timeout should not exceed the provided retry 
timeout
         config.connectionSetupTimeoutMs,
         config.connectionSetupTimeoutMaxMs,
         time,
@@ -283,7 +283,7 @@ class BrokerToControllerRequestThread(
   time: Time,
   threadName: String,
   retryTimeoutMs: Long
-) extends InterBrokerSendThread(threadName, networkClient, 
config.controllerSocketTimeoutMs, time, isInterruptible = false) {
+) extends InterBrokerSendThread(threadName, networkClient, 
Math.min(Int.MaxValue, Math.min(config.controllerSocketTimeoutMs, 
retryTimeoutMs)).toInt, time, isInterruptible = false) {
 
   private val requestQueue = new 
LinkedBlockingDeque[BrokerToControllerQueueItem]()
   private val activeController = new AtomicReference[Node](null)
diff --git a/core/src/main/scala/kafka/server/ControllerApis.scala 
b/core/src/main/scala/kafka/server/ControllerApis.scala
index 657c2965533..1c3586263a7 100644
--- a/core/src/main/scala/kafka/server/ControllerApis.scala
+++ b/core/src/main/scala/kafka/server/ControllerApis.scala
@@ -576,7 +576,7 @@ class ControllerApis(val requestChannel: RequestChannel,
     val heartbeatRequest = request.body[BrokerHeartbeatRequest]
     authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
     val context = new ControllerRequestContext(request.context.header.data, 
request.context.principal,
-      requestTimeoutMsToDeadlineNs(time, config.brokerHeartbeatIntervalMs))
+      requestTimeoutMsToDeadlineNs(time, config.brokerHeartbeatIntervalMs / 2))
     controller.processBrokerHeartbeat(context, 
heartbeatRequest.data).handle[Unit] { (reply, e) =>
       def createResponseCallback(requestThrottleMs: Int,
                                  reply: BrokerHeartbeatReply,

Reply via email to