This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new c7c82baf873 MINOR: Always send cumulative failed dirs in HB request (#14770) c7c82baf873 is described below commit c7c82baf873011a0d79ab0bcfcde53205c26af4c Author: Igor Soarez <soa...@apple.com> AuthorDate: Tue Nov 21 00:18:38 2023 +0000 MINOR: Always send cumulative failed dirs in HB request (#14770) Instead of only sending failed log directory UUIDs in the heartbeat request until a successful response is received, the broker sends the full cumulative set of failed directories since startup time. This aims to simplify the handling of log directory failure in the controller side, considering overload mode handling of heartbeat requests, which returns an undifferentiated reply. Reviewers: Colin P. McCabe <cmcc...@apache.org>, Proven Provenzano <pprovenz...@confluent.io> --- .../kafka/server/BrokerLifecycleManager.scala | 21 +++++++------- .../kafka/server/BrokerLifecycleManagerTest.scala | 33 +++++++++++----------- 2 files changed, 27 insertions(+), 27 deletions(-) diff --git a/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala b/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala index 0b74ad0a3e4..fd2c2cc8e45 100644 --- a/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala +++ b/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala @@ -149,10 +149,10 @@ class BrokerLifecycleManager( private var readyToUnfence = false /** - * List of offline directories pending to be sent. + * List of accumulated offline directories. * This variable can only be read or written from the event queue thread. */ - private var offlineDirsPending = Set[Uuid]() + private var offlineDirs = Set[Uuid]() /** * True if we sent a event queue to the active controller requesting controlled @@ -300,10 +300,10 @@ class BrokerLifecycleManager( private class OfflineDirEvent(val dir: Uuid) extends EventQueue.Event { override def run(): Unit = { - if (offlineDirsPending.isEmpty) { - offlineDirsPending = Set(dir) + if (offlineDirs.isEmpty) { + offlineDirs = Set(dir) } else { - offlineDirsPending = offlineDirsPending + dir + offlineDirs = offlineDirs + dir } if (registered) { scheduleNextCommunicationImmediately() @@ -424,15 +424,15 @@ class BrokerLifecycleManager( setCurrentMetadataOffset(metadataOffset). setWantFence(!readyToUnfence). setWantShutDown(_state == BrokerState.PENDING_CONTROLLED_SHUTDOWN). - setOfflineLogDirs(offlineDirsPending.toSeq.asJava) + setOfflineLogDirs(offlineDirs.toSeq.asJava) if (isTraceEnabled) { trace(s"Sending broker heartbeat $data") } - val handler = new BrokerHeartbeatResponseHandler(offlineDirsPending) + val handler = new BrokerHeartbeatResponseHandler() _channelManager.sendRequest(new BrokerHeartbeatRequest.Builder(data), handler) } - private class BrokerHeartbeatResponseHandler(dirsInFlight: Set[Uuid]) extends ControllerRequestCompletionHandler { + private class BrokerHeartbeatResponseHandler() extends ControllerRequestCompletionHandler { override def onComplete(response: ClientResponse): Unit = { if (response.authenticationException() != null) { error(s"Unable to send broker heartbeat for $nodeId because of an " + @@ -456,7 +456,7 @@ class BrokerLifecycleManager( // this response handler is not invoked from the event handler thread, // and processing a successful heartbeat response requires updating // state, so to continue we need to schedule an event - eventQueue.prepend(new BrokerHeartbeatResponseEvent(message.data(), dirsInFlight)) + eventQueue.prepend(new BrokerHeartbeatResponseEvent(message.data())) } else { warn(s"Broker $nodeId sent a heartbeat request but received error $errorCode.") scheduleNextCommunicationAfterFailure() @@ -470,10 +470,9 @@ class BrokerLifecycleManager( } } - private class BrokerHeartbeatResponseEvent(response: BrokerHeartbeatResponseData, dirsInFlight: Set[Uuid]) extends EventQueue.Event { + private class BrokerHeartbeatResponseEvent(response: BrokerHeartbeatResponseData) extends EventQueue.Event { override def run(): Unit = { failedAttempts = 0 - offlineDirsPending = offlineDirsPending.diff(dirsInFlight) _state match { case BrokerState.STARTING => if (response.isCaughtUp) { diff --git a/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala b/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala index 0bc993d55df..1d5afa42502 100644 --- a/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala @@ -207,34 +207,35 @@ class BrokerLifecycleManagerTest { } @Test - def testOfflineDirsSentUntilHeartbeatSuccess(): Unit = { + def testAlwaysSendsAccumulatedOfflineDirs(): Unit = { val ctx = new RegistrationTestContext(configProperties) val manager = new BrokerLifecycleManager(ctx.config, ctx.time, "offline-dirs-sent-in-heartbeat-", isZkBroker = false) val controllerNode = new Node(3000, "localhost", 8021) ctx.controllerNodeProvider.node.set(controllerNode) val registration = prepareResponse(ctx, new BrokerRegistrationResponse(new BrokerRegistrationResponseData().setBrokerEpoch(1000))) - val hb1 = prepareResponse[BrokerHeartbeatRequest](ctx, new BrokerHeartbeatResponse(new BrokerHeartbeatResponseData() - .setErrorCode(Errors.NOT_CONTROLLER.code()))) - val hb2 = prepareResponse[BrokerHeartbeatRequest](ctx, new BrokerHeartbeatResponse(new BrokerHeartbeatResponseData())) - val hb3 = prepareResponse[BrokerHeartbeatRequest](ctx, new BrokerHeartbeatResponse(new BrokerHeartbeatResponseData())) + val heartbeats = Seq.fill(6)(prepareResponse[BrokerHeartbeatRequest](ctx, new BrokerHeartbeatResponse(new BrokerHeartbeatResponseData()))) - val offlineDirs = Set(Uuid.fromString("h3sC4Yk-Q9-fd0ntJTocCA"), Uuid.fromString("ej8Q9_d2Ri6FXNiTxKFiow")) - offlineDirs.foreach(manager.propagateDirectoryFailure) - - // start the manager late to prevent a race, and force expectations on the first heartbeat manager.start(() => ctx.highestMetadataOffset.get(), ctx.mockChannelManager, ctx.clusterId, ctx.advertisedListeners, Collections.emptyMap(), OptionalLong.empty()) - poll(ctx, manager, registration) - val dirs1 = poll(ctx, manager, hb1).data().offlineLogDirs() - val dirs2 = poll(ctx, manager, hb2).data().offlineLogDirs() - val dirs3 = poll(ctx, manager, hb3).data().offlineLogDirs() - assertEquals(offlineDirs, dirs1.asScala.toSet) - assertEquals(offlineDirs, dirs2.asScala.toSet) - assertEquals(Set.empty, dirs3.asScala.toSet) + manager.propagateDirectoryFailure(Uuid.fromString("h3sC4Yk-Q9-fd0ntJTocCA")) + poll(ctx, manager, heartbeats(0)).data() + val dirs1 = poll(ctx, manager, heartbeats(1)).data().offlineLogDirs() + + manager.propagateDirectoryFailure(Uuid.fromString("ej8Q9_d2Ri6FXNiTxKFiow")) + poll(ctx, manager, heartbeats(2)).data() + val dirs2 = poll(ctx, manager, heartbeats(3)).data().offlineLogDirs() + + manager.propagateDirectoryFailure(Uuid.fromString("1iF76HVNRPqC7Y4r6647eg")) + poll(ctx, manager, heartbeats(4)).data() + val dirs3 = poll(ctx, manager, heartbeats(5)).data().offlineLogDirs() + + assertEquals(Set("h3sC4Yk-Q9-fd0ntJTocCA").map(Uuid.fromString), dirs1.asScala.toSet) + assertEquals(Set("h3sC4Yk-Q9-fd0ntJTocCA", "ej8Q9_d2Ri6FXNiTxKFiow").map(Uuid.fromString), dirs2.asScala.toSet) + assertEquals(Set("h3sC4Yk-Q9-fd0ntJTocCA", "ej8Q9_d2Ri6FXNiTxKFiow", "1iF76HVNRPqC7Y4r6647eg").map(Uuid.fromString), dirs3.asScala.toSet) manager.close() }