This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.3 by this push:
     new eda3d76ead2 KAFKA-14292; Fix KRaft controlled shutdown delay (#12736)
eda3d76ead2 is described below

commit eda3d76ead2aec87dfef347bc922451ee56e687b
Author: Alyssa Huang <ahu...@confluent.io>
AuthorDate: Thu Oct 13 13:29:45 2022 -0700

    KAFKA-14292; Fix KRaft controlled shutdown delay (#12736)
    
    The `controlledShutDownOffset` is defined as the "offset at which the 
broker should complete its controlled shutdown, or -1 if the broker is not 
performing a controlled shutdown". The controller sets this offset to a 
non-negative integer on receiving a heartbeat from a broker that's in 
controlled shutdown state. Currently, this offset is being updated and bumped 
every single time a broker in controlled shutdown mode send a heartbeat, 
delaying when controlled shutdown can actually comp [...]
    
    Reviewers: Luke Chen <show...@gmail.com>, Jason Gustafson 
<ja...@confluent.io>
---
 .../kafka/controller/BrokerHeartbeatManager.java   | 43 +++++++++++++++-------
 .../apache/kafka/controller/QuorumController.java  |  2 +-
 .../controller/BrokerHeartbeatManagerTest.java     | 30 +++++++++++++--
 3 files changed, 56 insertions(+), 19 deletions(-)

diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java
 
b/metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java
index 428f1c5833e..f061cd95034 100644
--- 
a/metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.controller;
 
+import java.util.OptionalLong;
 import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
@@ -42,8 +43,9 @@ import static 
org.apache.kafka.controller.BrokerControlState.UNFENCED;
 /**
  * The BrokerHeartbeatManager manages all the soft state associated with 
broker heartbeats.
  * Soft state is state which does not appear in the metadata log.  This state 
includes
- * things like the last time each broker sent us a heartbeat, and whether the 
broker is
- * trying to perform a controlled shutdown.
+ * things like the last time each broker sent us a heartbeat.  As of KIP-841, 
the controlled
+ * shutdown state is no longer treated as soft state and is persisted to the 
metadata log on broker
+ * controlled shutdown requests.
  *
  * Only the active controller has a BrokerHeartbeatManager, since only the 
active
  * controller handles broker heartbeats.  Standby controllers will create a 
heartbeat
@@ -77,7 +79,7 @@ public class BrokerHeartbeatManager {
          * if the broker is not performing a controlled shutdown.  When this 
field is
          * updated, we also have to update the broker's position in the 
shuttingDown set.
          */
-        private long controlledShutDownOffset;
+        private long controlledShutdownOffset;
 
         /**
          * The previous entry in the unfenced list, or null if the broker is 
not in that list.
@@ -95,7 +97,7 @@ public class BrokerHeartbeatManager {
             this.prev = null;
             this.next = null;
             this.metadataOffset = -1;
-            this.controlledShutDownOffset = -1;
+            this.controlledShutdownOffset = -1;
         }
 
         /**
@@ -116,7 +118,7 @@ public class BrokerHeartbeatManager {
          * Returns true only if the broker is in controlled shutdown state.
          */
         boolean shuttingDown() {
-            return controlledShutDownOffset >= 0;
+            return controlledShutdownOffset >= 0;
         }
     }
 
@@ -275,6 +277,16 @@ public class BrokerHeartbeatManager {
         return brokers.values();
     }
 
+    // VisibleForTesting
+    OptionalLong controlledShutdownOffset(int brokerId) {
+        BrokerHeartbeatState broker = brokers.get(brokerId);
+        if (broker == null || broker.controlledShutdownOffset == -1) {
+            return OptionalLong.empty();
+        }
+        return OptionalLong.of(broker.controlledShutdownOffset);
+    }
+
+
     /**
      * Mark a broker as fenced.
      *
@@ -381,7 +393,7 @@ public class BrokerHeartbeatManager {
         if (fenced) {
             // If a broker is fenced, it leaves controlled shutdown.  On its 
next heartbeat,
             // it will shut down immediately.
-            broker.controlledShutDownOffset = -1;
+            broker.controlledShutdownOffset = -1;
         } else {
             unfenced.add(broker);
             if (!broker.shuttingDown()) {
@@ -400,12 +412,13 @@ public class BrokerHeartbeatManager {
     }
 
     /**
-     * Mark a broker as being in the controlled shutdown state.
+     * Mark a broker as being in the controlled shutdown state. We only update 
the
+     * controlledShutdownOffset if the broker was previously not in controlled 
shutdown state.
      *
      * @param brokerId                  The broker id.
      * @param controlledShutDownOffset  The offset at which controlled 
shutdown will be complete.
      */
-    void updateControlledShutdownOffset(int brokerId, long 
controlledShutDownOffset) {
+    void maybeUpdateControlledShutdownOffset(int brokerId, long 
controlledShutDownOffset) {
         BrokerHeartbeatState broker = brokers.get(brokerId);
         if (broker == null) {
             throw new RuntimeException("Unable to locate broker " + brokerId);
@@ -414,9 +427,11 @@ public class BrokerHeartbeatManager {
             throw new RuntimeException("Fenced brokers cannot enter controlled 
shutdown.");
         }
         active.remove(broker);
-        broker.controlledShutDownOffset = controlledShutDownOffset;
-        log.debug("Updated the controlled shutdown offset for broker {} to 
{}.",
-            brokerId, controlledShutDownOffset);
+        if (broker.controlledShutdownOffset < 0) {
+            broker.controlledShutdownOffset = controlledShutDownOffset;
+            log.debug("Updated the controlled shutdown offset for broker {} to 
{}.",
+                brokerId, controlledShutDownOffset);
+        }
     }
 
     /**
@@ -581,17 +596,17 @@ public class BrokerHeartbeatManager {
                     return new BrokerControlStates(currentState, 
CONTROLLED_SHUTDOWN);
                 }
                 long lowestActiveOffset = lowestActiveOffset();
-                if (broker.controlledShutDownOffset <= lowestActiveOffset) {
+                if (broker.controlledShutdownOffset <= lowestActiveOffset) {
                     log.info("The request from broker {} to shut down has been 
granted " +
                         "since the lowest active offset {} is now greater than 
the " +
                         "broker's controlled shutdown offset {}.", brokerId,
-                        lowestActiveOffset, broker.controlledShutDownOffset);
+                        lowestActiveOffset, broker.controlledShutdownOffset);
                     return new BrokerControlStates(currentState, SHUTDOWN_NOW);
                 }
                 log.debug("The request from broker {} to shut down can not yet 
be granted " +
                     "because the lowest active offset {} is not greater than 
the broker's " +
                     "shutdown offset {}.", brokerId, lowestActiveOffset,
-                    broker.controlledShutDownOffset);
+                    broker.controlledShutdownOffset);
                 return new BrokerControlStates(currentState, 
CONTROLLED_SHUTDOWN);
 
             default:
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java 
b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index 034ef1e216c..2255c741535 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -1994,7 +1994,7 @@ public final class QuorumController implements Controller 
{
                 public void processBatchEndOffset(long offset) {
                     if (inControlledShutdown) {
                         clusterControl.heartbeatManager().
-                            updateControlledShutdownOffset(brokerId, offset);
+                            maybeUpdateControlledShutdownOffset(brokerId, 
offset);
                     }
                 }
             });
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/BrokerHeartbeatManagerTest.java
 
b/metadata/src/test/java/org/apache/kafka/controller/BrokerHeartbeatManagerTest.java
index 28387b17a0f..d325e4f88cc 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/BrokerHeartbeatManagerTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/BrokerHeartbeatManagerTest.java
@@ -21,6 +21,7 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Optional;
+import java.util.OptionalLong;
 import java.util.Set;
 import java.util.TreeSet;
 import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
@@ -203,17 +204,38 @@ public class BrokerHeartbeatManagerTest {
         expected.add(new UsableBroker(3, Optional.of("rack2"), false));
         expected.add(new UsableBroker(4, Optional.of("rack1"), true));
         assertEquals(expected, usableBrokersToSet(manager));
-        manager.updateControlledShutdownOffset(2, 0);
+        manager.maybeUpdateControlledShutdownOffset(2, 0);
         assertEquals(100L, manager.lowestActiveOffset());
         assertThrows(RuntimeException.class,
-            () -> manager.updateControlledShutdownOffset(4, 0));
+            () -> manager.maybeUpdateControlledShutdownOffset(4, 0));
         manager.touch(4, false, 100);
-        manager.updateControlledShutdownOffset(4, 0);
+        manager.maybeUpdateControlledShutdownOffset(4, 0);
         expected.remove(new UsableBroker(2, Optional.of("rack1"), false));
         expected.remove(new UsableBroker(4, Optional.of("rack1"), true));
         assertEquals(expected, usableBrokersToSet(manager));
     }
 
+    @Test
+    public void testControlledShutdownOffsetIsOnlyUpdatedOnce() {
+        BrokerHeartbeatManager manager = newBrokerHeartbeatManager();
+        assertEquals(Collections.emptySet(), usableBrokersToSet(manager));
+        manager.touch(0, false, 100);
+        manager.touch(1, false, 100);
+        manager.touch(2, false, 98);
+        manager.touch(3, false, 100);
+        manager.touch(4, true, 100);
+        assertEquals(OptionalLong.empty(), 
manager.controlledShutdownOffset(2));
+        manager.maybeUpdateControlledShutdownOffset(2, 98);
+        assertEquals(OptionalLong.of(98), manager.controlledShutdownOffset(2));
+        manager.maybeUpdateControlledShutdownOffset(2, 99);
+        assertEquals(OptionalLong.of(98), manager.controlledShutdownOffset(2));
+        assertEquals(OptionalLong.empty(), 
manager.controlledShutdownOffset(3));
+        manager.maybeUpdateControlledShutdownOffset(3, 101);
+        assertEquals(OptionalLong.of(101), 
manager.controlledShutdownOffset(3));
+        manager.maybeUpdateControlledShutdownOffset(3, 102);
+        assertEquals(OptionalLong.of(101), 
manager.controlledShutdownOffset(3));
+    }
+
     @Test
     public void testBrokerHeartbeatStateList() {
         BrokerHeartbeatStateList list = new BrokerHeartbeatStateList();
@@ -256,7 +278,7 @@ public class BrokerHeartbeatManagerTest {
         manager.touch(3, false, 100);
         manager.touch(4, true, 100);
         manager.touch(5, false, 99);
-        manager.updateControlledShutdownOffset(5, 99);
+        manager.maybeUpdateControlledShutdownOffset(5, 99);
 
         assertEquals(98L, manager.lowestActiveOffset());
 

Reply via email to