This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch 3.3 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.3 by this push: new eda3d76ead2 KAFKA-14292; Fix KRaft controlled shutdown delay (#12736) eda3d76ead2 is described below commit eda3d76ead2aec87dfef347bc922451ee56e687b Author: Alyssa Huang <ahu...@confluent.io> AuthorDate: Thu Oct 13 13:29:45 2022 -0700 KAFKA-14292; Fix KRaft controlled shutdown delay (#12736) The `controlledShutDownOffset` is defined as the "offset at which the broker should complete its controlled shutdown, or -1 if the broker is not performing a controlled shutdown". The controller sets this offset to a non-negative integer on receiving a heartbeat from a broker that's in controlled shutdown state. Currently, this offset is being updated and bumped every single time a broker in controlled shutdown mode send a heartbeat, delaying when controlled shutdown can actually comp [...] Reviewers: Luke Chen <show...@gmail.com>, Jason Gustafson <ja...@confluent.io> --- .../kafka/controller/BrokerHeartbeatManager.java | 43 +++++++++++++++------- .../apache/kafka/controller/QuorumController.java | 2 +- .../controller/BrokerHeartbeatManagerTest.java | 30 +++++++++++++-- 3 files changed, 56 insertions(+), 19 deletions(-) diff --git a/metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java b/metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java index 428f1c5833e..f061cd95034 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java @@ -17,6 +17,7 @@ package org.apache.kafka.controller; +import java.util.OptionalLong; import org.apache.kafka.common.message.BrokerHeartbeatRequestData; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; @@ -42,8 +43,9 @@ import static org.apache.kafka.controller.BrokerControlState.UNFENCED; /** * The BrokerHeartbeatManager manages all the soft state associated with broker heartbeats. * Soft state is state which does not appear in the metadata log. This state includes - * things like the last time each broker sent us a heartbeat, and whether the broker is - * trying to perform a controlled shutdown. + * things like the last time each broker sent us a heartbeat. As of KIP-841, the controlled + * shutdown state is no longer treated as soft state and is persisted to the metadata log on broker + * controlled shutdown requests. * * Only the active controller has a BrokerHeartbeatManager, since only the active * controller handles broker heartbeats. Standby controllers will create a heartbeat @@ -77,7 +79,7 @@ public class BrokerHeartbeatManager { * if the broker is not performing a controlled shutdown. When this field is * updated, we also have to update the broker's position in the shuttingDown set. */ - private long controlledShutDownOffset; + private long controlledShutdownOffset; /** * The previous entry in the unfenced list, or null if the broker is not in that list. @@ -95,7 +97,7 @@ public class BrokerHeartbeatManager { this.prev = null; this.next = null; this.metadataOffset = -1; - this.controlledShutDownOffset = -1; + this.controlledShutdownOffset = -1; } /** @@ -116,7 +118,7 @@ public class BrokerHeartbeatManager { * Returns true only if the broker is in controlled shutdown state. */ boolean shuttingDown() { - return controlledShutDownOffset >= 0; + return controlledShutdownOffset >= 0; } } @@ -275,6 +277,16 @@ public class BrokerHeartbeatManager { return brokers.values(); } + // VisibleForTesting + OptionalLong controlledShutdownOffset(int brokerId) { + BrokerHeartbeatState broker = brokers.get(brokerId); + if (broker == null || broker.controlledShutdownOffset == -1) { + return OptionalLong.empty(); + } + return OptionalLong.of(broker.controlledShutdownOffset); + } + + /** * Mark a broker as fenced. * @@ -381,7 +393,7 @@ public class BrokerHeartbeatManager { if (fenced) { // If a broker is fenced, it leaves controlled shutdown. On its next heartbeat, // it will shut down immediately. - broker.controlledShutDownOffset = -1; + broker.controlledShutdownOffset = -1; } else { unfenced.add(broker); if (!broker.shuttingDown()) { @@ -400,12 +412,13 @@ public class BrokerHeartbeatManager { } /** - * Mark a broker as being in the controlled shutdown state. + * Mark a broker as being in the controlled shutdown state. We only update the + * controlledShutdownOffset if the broker was previously not in controlled shutdown state. * * @param brokerId The broker id. * @param controlledShutDownOffset The offset at which controlled shutdown will be complete. */ - void updateControlledShutdownOffset(int brokerId, long controlledShutDownOffset) { + void maybeUpdateControlledShutdownOffset(int brokerId, long controlledShutDownOffset) { BrokerHeartbeatState broker = brokers.get(brokerId); if (broker == null) { throw new RuntimeException("Unable to locate broker " + brokerId); @@ -414,9 +427,11 @@ public class BrokerHeartbeatManager { throw new RuntimeException("Fenced brokers cannot enter controlled shutdown."); } active.remove(broker); - broker.controlledShutDownOffset = controlledShutDownOffset; - log.debug("Updated the controlled shutdown offset for broker {} to {}.", - brokerId, controlledShutDownOffset); + if (broker.controlledShutdownOffset < 0) { + broker.controlledShutdownOffset = controlledShutDownOffset; + log.debug("Updated the controlled shutdown offset for broker {} to {}.", + brokerId, controlledShutDownOffset); + } } /** @@ -581,17 +596,17 @@ public class BrokerHeartbeatManager { return new BrokerControlStates(currentState, CONTROLLED_SHUTDOWN); } long lowestActiveOffset = lowestActiveOffset(); - if (broker.controlledShutDownOffset <= lowestActiveOffset) { + if (broker.controlledShutdownOffset <= lowestActiveOffset) { log.info("The request from broker {} to shut down has been granted " + "since the lowest active offset {} is now greater than the " + "broker's controlled shutdown offset {}.", brokerId, - lowestActiveOffset, broker.controlledShutDownOffset); + lowestActiveOffset, broker.controlledShutdownOffset); return new BrokerControlStates(currentState, SHUTDOWN_NOW); } log.debug("The request from broker {} to shut down can not yet be granted " + "because the lowest active offset {} is not greater than the broker's " + "shutdown offset {}.", brokerId, lowestActiveOffset, - broker.controlledShutDownOffset); + broker.controlledShutdownOffset); return new BrokerControlStates(currentState, CONTROLLED_SHUTDOWN); default: diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java index 034ef1e216c..2255c741535 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java +++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java @@ -1994,7 +1994,7 @@ public final class QuorumController implements Controller { public void processBatchEndOffset(long offset) { if (inControlledShutdown) { clusterControl.heartbeatManager(). - updateControlledShutdownOffset(brokerId, offset); + maybeUpdateControlledShutdownOffset(brokerId, offset); } } }); diff --git a/metadata/src/test/java/org/apache/kafka/controller/BrokerHeartbeatManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/BrokerHeartbeatManagerTest.java index 28387b17a0f..d325e4f88cc 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/BrokerHeartbeatManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/BrokerHeartbeatManagerTest.java @@ -21,6 +21,7 @@ import java.util.Collections; import java.util.HashSet; import java.util.Iterator; import java.util.Optional; +import java.util.OptionalLong; import java.util.Set; import java.util.TreeSet; import org.apache.kafka.common.message.BrokerHeartbeatRequestData; @@ -203,17 +204,38 @@ public class BrokerHeartbeatManagerTest { expected.add(new UsableBroker(3, Optional.of("rack2"), false)); expected.add(new UsableBroker(4, Optional.of("rack1"), true)); assertEquals(expected, usableBrokersToSet(manager)); - manager.updateControlledShutdownOffset(2, 0); + manager.maybeUpdateControlledShutdownOffset(2, 0); assertEquals(100L, manager.lowestActiveOffset()); assertThrows(RuntimeException.class, - () -> manager.updateControlledShutdownOffset(4, 0)); + () -> manager.maybeUpdateControlledShutdownOffset(4, 0)); manager.touch(4, false, 100); - manager.updateControlledShutdownOffset(4, 0); + manager.maybeUpdateControlledShutdownOffset(4, 0); expected.remove(new UsableBroker(2, Optional.of("rack1"), false)); expected.remove(new UsableBroker(4, Optional.of("rack1"), true)); assertEquals(expected, usableBrokersToSet(manager)); } + @Test + public void testControlledShutdownOffsetIsOnlyUpdatedOnce() { + BrokerHeartbeatManager manager = newBrokerHeartbeatManager(); + assertEquals(Collections.emptySet(), usableBrokersToSet(manager)); + manager.touch(0, false, 100); + manager.touch(1, false, 100); + manager.touch(2, false, 98); + manager.touch(3, false, 100); + manager.touch(4, true, 100); + assertEquals(OptionalLong.empty(), manager.controlledShutdownOffset(2)); + manager.maybeUpdateControlledShutdownOffset(2, 98); + assertEquals(OptionalLong.of(98), manager.controlledShutdownOffset(2)); + manager.maybeUpdateControlledShutdownOffset(2, 99); + assertEquals(OptionalLong.of(98), manager.controlledShutdownOffset(2)); + assertEquals(OptionalLong.empty(), manager.controlledShutdownOffset(3)); + manager.maybeUpdateControlledShutdownOffset(3, 101); + assertEquals(OptionalLong.of(101), manager.controlledShutdownOffset(3)); + manager.maybeUpdateControlledShutdownOffset(3, 102); + assertEquals(OptionalLong.of(101), manager.controlledShutdownOffset(3)); + } + @Test public void testBrokerHeartbeatStateList() { BrokerHeartbeatStateList list = new BrokerHeartbeatStateList(); @@ -256,7 +278,7 @@ public class BrokerHeartbeatManagerTest { manager.touch(3, false, 100); manager.touch(4, true, 100); manager.touch(5, false, 99); - manager.updateControlledShutdownOffset(5, 99); + manager.maybeUpdateControlledShutdownOffset(5, 99); assertEquals(98L, manager.lowestActiveOffset());