This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c7c82baf873 MINOR: Always send cumulative failed dirs in HB request 
(#14770)
c7c82baf873 is described below

commit c7c82baf873011a0d79ab0bcfcde53205c26af4c
Author: Igor Soarez <soa...@apple.com>
AuthorDate: Tue Nov 21 00:18:38 2023 +0000

    MINOR: Always send cumulative failed dirs in HB request (#14770)
    
    Instead of only sending failed log directory UUIDs in the heartbeat
    request until a successful response is received, the broker sends
    the full cumulative set of failed directories since startup time.
    
    This aims to simplify the handling of log directory failure in the
    controller side, considering overload mode handling of heartbeat
    requests, which returns an undifferentiated reply.
    
    Reviewers: Colin P. McCabe <cmcc...@apache.org>, Proven Provenzano 
<pprovenz...@confluent.io>
---
 .../kafka/server/BrokerLifecycleManager.scala      | 21 +++++++-------
 .../kafka/server/BrokerLifecycleManagerTest.scala  | 33 +++++++++++-----------
 2 files changed, 27 insertions(+), 27 deletions(-)

diff --git a/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala 
b/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala
index 0b74ad0a3e4..fd2c2cc8e45 100644
--- a/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala
+++ b/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala
@@ -149,10 +149,10 @@ class BrokerLifecycleManager(
   private var readyToUnfence = false
 
   /**
-   * List of offline directories pending to be sent.
+   * List of accumulated offline directories.
    * This variable can only be read or written from the event queue thread.
    */
-  private var offlineDirsPending = Set[Uuid]()
+  private var offlineDirs = Set[Uuid]()
 
   /**
    * True if we sent a event queue to the active controller requesting 
controlled
@@ -300,10 +300,10 @@ class BrokerLifecycleManager(
 
   private class OfflineDirEvent(val dir: Uuid) extends EventQueue.Event {
     override def run(): Unit = {
-      if (offlineDirsPending.isEmpty) {
-        offlineDirsPending = Set(dir)
+      if (offlineDirs.isEmpty) {
+        offlineDirs = Set(dir)
       } else {
-        offlineDirsPending = offlineDirsPending + dir
+        offlineDirs = offlineDirs + dir
       }
       if (registered) {
         scheduleNextCommunicationImmediately()
@@ -424,15 +424,15 @@ class BrokerLifecycleManager(
       setCurrentMetadataOffset(metadataOffset).
       setWantFence(!readyToUnfence).
       setWantShutDown(_state == BrokerState.PENDING_CONTROLLED_SHUTDOWN).
-      setOfflineLogDirs(offlineDirsPending.toSeq.asJava)
+      setOfflineLogDirs(offlineDirs.toSeq.asJava)
     if (isTraceEnabled) {
       trace(s"Sending broker heartbeat $data")
     }
-    val handler = new BrokerHeartbeatResponseHandler(offlineDirsPending)
+    val handler = new BrokerHeartbeatResponseHandler()
     _channelManager.sendRequest(new BrokerHeartbeatRequest.Builder(data), 
handler)
   }
 
-  private class BrokerHeartbeatResponseHandler(dirsInFlight: Set[Uuid]) 
extends ControllerRequestCompletionHandler {
+  private class BrokerHeartbeatResponseHandler() extends 
ControllerRequestCompletionHandler {
     override def onComplete(response: ClientResponse): Unit = {
       if (response.authenticationException() != null) {
         error(s"Unable to send broker heartbeat for $nodeId because of an " +
@@ -456,7 +456,7 @@ class BrokerLifecycleManager(
           // this response handler is not invoked from the event handler 
thread,
           // and processing a successful heartbeat response requires updating
           // state, so to continue we need to schedule an event
-          eventQueue.prepend(new BrokerHeartbeatResponseEvent(message.data(), 
dirsInFlight))
+          eventQueue.prepend(new BrokerHeartbeatResponseEvent(message.data()))
         } else {
           warn(s"Broker $nodeId sent a heartbeat request but received error 
$errorCode.")
           scheduleNextCommunicationAfterFailure()
@@ -470,10 +470,9 @@ class BrokerLifecycleManager(
     }
   }
 
-  private class BrokerHeartbeatResponseEvent(response: 
BrokerHeartbeatResponseData, dirsInFlight: Set[Uuid]) extends EventQueue.Event {
+  private class BrokerHeartbeatResponseEvent(response: 
BrokerHeartbeatResponseData) extends EventQueue.Event {
     override def run(): Unit = {
       failedAttempts = 0
-      offlineDirsPending = offlineDirsPending.diff(dirsInFlight)
       _state match {
         case BrokerState.STARTING =>
           if (response.isCaughtUp) {
diff --git 
a/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
index 0bc993d55df..1d5afa42502 100644
--- a/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
@@ -207,34 +207,35 @@ class BrokerLifecycleManagerTest {
   }
 
   @Test
-  def testOfflineDirsSentUntilHeartbeatSuccess(): Unit = {
+  def testAlwaysSendsAccumulatedOfflineDirs(): Unit = {
     val ctx = new RegistrationTestContext(configProperties)
     val manager = new BrokerLifecycleManager(ctx.config, ctx.time, 
"offline-dirs-sent-in-heartbeat-", isZkBroker = false)
     val controllerNode = new Node(3000, "localhost", 8021)
     ctx.controllerNodeProvider.node.set(controllerNode)
 
     val registration = prepareResponse(ctx, new BrokerRegistrationResponse(new 
BrokerRegistrationResponseData().setBrokerEpoch(1000)))
-    val hb1 = prepareResponse[BrokerHeartbeatRequest](ctx, new 
BrokerHeartbeatResponse(new BrokerHeartbeatResponseData()
-      .setErrorCode(Errors.NOT_CONTROLLER.code())))
-    val hb2 = prepareResponse[BrokerHeartbeatRequest](ctx, new 
BrokerHeartbeatResponse(new BrokerHeartbeatResponseData()))
-    val hb3 = prepareResponse[BrokerHeartbeatRequest](ctx, new 
BrokerHeartbeatResponse(new BrokerHeartbeatResponseData()))
+    val heartbeats = Seq.fill(6)(prepareResponse[BrokerHeartbeatRequest](ctx, 
new BrokerHeartbeatResponse(new BrokerHeartbeatResponseData())))
 
-    val offlineDirs = Set(Uuid.fromString("h3sC4Yk-Q9-fd0ntJTocCA"), 
Uuid.fromString("ej8Q9_d2Ri6FXNiTxKFiow"))
-    offlineDirs.foreach(manager.propagateDirectoryFailure)
-
-    // start the manager late to prevent a race, and force expectations on the 
first heartbeat
     manager.start(() => ctx.highestMetadataOffset.get(),
       ctx.mockChannelManager, ctx.clusterId, ctx.advertisedListeners,
       Collections.emptyMap(), OptionalLong.empty())
-
     poll(ctx, manager, registration)
-    val dirs1 = poll(ctx, manager, hb1).data().offlineLogDirs()
-    val dirs2 = poll(ctx, manager, hb2).data().offlineLogDirs()
-    val dirs3 = poll(ctx, manager, hb3).data().offlineLogDirs()
 
-    assertEquals(offlineDirs, dirs1.asScala.toSet)
-    assertEquals(offlineDirs, dirs2.asScala.toSet)
-    assertEquals(Set.empty, dirs3.asScala.toSet)
+    
manager.propagateDirectoryFailure(Uuid.fromString("h3sC4Yk-Q9-fd0ntJTocCA"))
+    poll(ctx, manager, heartbeats(0)).data()
+    val dirs1 = poll(ctx, manager, heartbeats(1)).data().offlineLogDirs()
+
+    
manager.propagateDirectoryFailure(Uuid.fromString("ej8Q9_d2Ri6FXNiTxKFiow"))
+    poll(ctx, manager, heartbeats(2)).data()
+    val dirs2 = poll(ctx, manager, heartbeats(3)).data().offlineLogDirs()
+
+    
manager.propagateDirectoryFailure(Uuid.fromString("1iF76HVNRPqC7Y4r6647eg"))
+    poll(ctx, manager, heartbeats(4)).data()
+    val dirs3 = poll(ctx, manager, heartbeats(5)).data().offlineLogDirs()
+
+    assertEquals(Set("h3sC4Yk-Q9-fd0ntJTocCA").map(Uuid.fromString), 
dirs1.asScala.toSet)
+    assertEquals(Set("h3sC4Yk-Q9-fd0ntJTocCA", 
"ej8Q9_d2Ri6FXNiTxKFiow").map(Uuid.fromString), dirs2.asScala.toSet)
+    assertEquals(Set("h3sC4Yk-Q9-fd0ntJTocCA", "ej8Q9_d2Ri6FXNiTxKFiow", 
"1iF76HVNRPqC7Y4r6647eg").map(Uuid.fromString), dirs3.asScala.toSet)
     manager.close()
   }
 

Reply via email to