This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6e446f0b05f KAFKA-19047: Allow quickly re-registering brokers that are 
in controlled shutdown (#19296)
6e446f0b05f is described below

commit 6e446f0b05fa205e60ea2a07f5997cdd7ed0ef3f
Author: Alyssa Huang <[email protected]>
AuthorDate: Tue Apr 8 13:39:04 2025 -0700

    KAFKA-19047: Allow quickly re-registering brokers that are in controlled 
shutdown (#19296)
    
    Allow re-registration of brokers with active sessions if the previous 
broker registration was in controlled shutdown.
    
    Reviewers: Colin P. McCabe <[email protected]>, Reviewers: José Armando 
García Sancio <[email protected]>, David Mao <[email protected]>
---
 .../kafka/controller/ClusterControlManager.java    |   6 +-
 .../controller/ClusterControlManagerTest.java      | 127 +++++++++++++++++++++
 2 files changed, 130 insertions(+), 3 deletions(-)

diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java 
b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
index c9064f991f0..7a52e23f92c 100644
--- 
a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
@@ -353,10 +353,10 @@ public class ClusterControlManager {
         if (existing != null) {
             prevIncarnationId = existing.incarnationId();
             storedBrokerEpoch = existing.epoch();
-            if (heartbeatManager.hasValidSession(brokerId, existing.epoch())) {
+            if (heartbeatManager.hasValidSession(brokerId, existing.epoch()) 
&& !existing.inControlledShutdown()) {
                 if (!request.incarnationId().equals(prevIncarnationId)) {
-                    throw new DuplicateBrokerRegistrationException("Another 
broker is " +
-                        "registered with that broker id.");
+                    throw new DuplicateBrokerRegistrationException("Another 
broker is registered with that broker id. If the broker " + 
+                            "was recently restarted this should self-resolve 
once the heartbeat manager expires the broker's session.");
                 }
             }
         }
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
 
b/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
index 91e8d1a77f3..93a41c5d0e3 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
@@ -20,6 +20,7 @@ package org.apache.kafka.controller;
 import org.apache.kafka.common.DirectoryId;
 import org.apache.kafka.common.Endpoint;
 import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.DuplicateBrokerRegistrationException;
 import org.apache.kafka.common.errors.InconsistentClusterIdException;
 import org.apache.kafka.common.errors.InvalidRegistrationException;
 import org.apache.kafka.common.errors.StaleBrokerEpochException;
@@ -70,6 +71,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.OptionalLong;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Stream;
 
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
@@ -978,6 +980,131 @@ public class ClusterControlManagerTest {
             contactTime(new BrokerIdAndEpoch(2, 100)));
     }
 
+    @Test
+    public void testDuplicateBrokerRegistrationWithActiveOldBroker() {
+        // active here means brokerHeartbeatManager last recorded the broker 
as unfenced and not in controlled shutdown
+        long brokerSessionTimeoutMs = 1000;
+        MockTime time = new MockTime(0L, 20L, 1000L);
+        FinalizedControllerFeatures finalizedFeatures = new 
FinalizedControllerFeatures(
+            Map.of(MetadataVersion.FEATURE_NAME, 
MetadataVersion.LATEST_PRODUCTION.featureLevel()), 456L);
+        ClusterControlManager clusterControl = new 
ClusterControlManager.Builder().
+            setClusterId("pjvUwj3ZTEeSVQmUiH3IJw").
+            setFeatureControlManager(createFeatureControlManager()).
+            setBrokerShutdownHandler((brokerId, isCleanShutdown, records) -> { 
}).
+            
setSessionTimeoutNs(TimeUnit.MILLISECONDS.toNanos(brokerSessionTimeoutMs)).
+            setTime(time).
+            build();
+        clusterControl.replay(new RegisterBrokerRecord().
+            setBrokerEpoch(100).
+            setBrokerId(0).
+            setLogDirs(List.of(Uuid.fromString("Mj3CW3OSRi29cFeNJlXuAQ"))).
+            setFenced(false), 10002);
+        clusterControl.activate();
+        assertEquals(OptionalLong.of(1000L), 
clusterControl.heartbeatManager().tracker().
+            contactTime(new BrokerIdAndEpoch(0, 100)));
+
+        // while session is still valid for old broker, duplicate requests 
should fail
+        time.sleep(brokerSessionTimeoutMs / 2);
+        assertThrows(DuplicateBrokerRegistrationException.class, () ->
+            clusterControl.registerBroker(new BrokerRegistrationRequestData().
+                    setClusterId("pjvUwj3ZTEeSVQmUiH3IJw").
+                    setBrokerId(0).
+                    
setLogDirs(List.of(Uuid.fromString("TyNK6XSSQJaJc2q9uflNHg"))).
+                    setFeatures(new 
BrokerRegistrationRequestData.FeatureCollection(
+                        Set.of(new BrokerRegistrationRequestData.Feature().
+                            setName(MetadataVersion.FEATURE_NAME).
+                            
setMinSupportedVersion(MetadataVersion.MINIMUM_VERSION.featureLevel()).
+                            
setMaxSupportedVersion(MetadataVersion.LATEST_PRODUCTION.featureLevel())).iterator())).
+                setIncarnationId(Uuid.fromString("0H4fUu1xQEKXFYwB1aBjhg")),
+                101L,
+                finalizedFeatures,
+                false));
+
+        // if session expires for broker, even if the broker was active the 
new registration will succeed
+        time.sleep(brokerSessionTimeoutMs);
+        clusterControl.registerBroker(new BrokerRegistrationRequestData().
+                setClusterId("pjvUwj3ZTEeSVQmUiH3IJw").
+                setBrokerId(0).
+                setLogDirs(List.of(Uuid.fromString("TyNK6XSSQJaJc2q9uflNHg"))).
+                setFeatures(new 
BrokerRegistrationRequestData.FeatureCollection(
+                    Set.of(new BrokerRegistrationRequestData.Feature().
+                        setName(MetadataVersion.FEATURE_NAME).
+                        
setMinSupportedVersion(MetadataVersion.MINIMUM_VERSION.featureLevel()).
+                        
setMaxSupportedVersion(MetadataVersion.LATEST_PRODUCTION.featureLevel())).iterator())).
+                setIncarnationId(Uuid.fromString("0H4fUu1xQEKXFYwB1aBjhg")),
+            101L,
+            finalizedFeatures,
+            false);
+    }
+
+    @Test
+    public void testDuplicateBrokerRegistrationWithInactiveBroker() {
+        // inactive here means brokerHeartbeatManager last recorded the broker 
as fenced or in controlled shutdown
+        long brokerSessionTimeoutMs = 1000;
+        MockTime time = new MockTime(0L, 20L, 1000L);
+        FinalizedControllerFeatures finalizedFeatures = new 
FinalizedControllerFeatures(
+            Map.of(MetadataVersion.FEATURE_NAME, 
MetadataVersion.LATEST_PRODUCTION.featureLevel()), 456L);
+        ClusterControlManager clusterControl = new 
ClusterControlManager.Builder().
+            setClusterId("pjvUwj3ZTEeSVQmUiH3IJw").
+            setFeatureControlManager(createFeatureControlManager()).
+            setBrokerShutdownHandler((brokerId, isCleanShutdown, records) -> { 
}).
+            
setSessionTimeoutNs(TimeUnit.MILLISECONDS.toNanos(brokerSessionTimeoutMs)).
+            setTime(time).
+            build();
+        // first broker is fenced
+        clusterControl.replay(new RegisterBrokerRecord().
+            setBrokerEpoch(100).
+            setBrokerId(0).
+            setLogDirs(List.of(Uuid.fromString("Mj3CW3OSRi29cFeNJlXuAQ"))).
+            setFenced(true).
+            setInControlledShutdown(false), 10002);
+        // second broker is in controlled shutdown
+        clusterControl.replay(new RegisterBrokerRecord().
+            setBrokerEpoch(200).
+            setBrokerId(1).
+            setLogDirs(List.of(Uuid.fromString("TyNK6XSSQJaJc2q9uflNHg"))).
+            setFenced(false).
+            setInControlledShutdown(true), 20002);
+        clusterControl.activate();
+        
clusterControl.heartbeatManager().maybeUpdateControlledShutdownOffset(1, 20002);
+
+        assertEquals(OptionalLong.of(1000L), 
clusterControl.heartbeatManager().tracker().
+            contactTime(new BrokerIdAndEpoch(0, 100)));
+        assertEquals(OptionalLong.of(1000L), 
clusterControl.heartbeatManager().tracker().
+            contactTime(new BrokerIdAndEpoch(1, 200)));
+
+        time.sleep(brokerSessionTimeoutMs / 2);
+        assertThrows(DuplicateBrokerRegistrationException.class, () ->
+            clusterControl.registerBroker(new BrokerRegistrationRequestData().
+                    setClusterId("pjvUwj3ZTEeSVQmUiH3IJw").
+                    setBrokerId(0).
+                    
setLogDirs(List.of(Uuid.fromString("yJGxmjfbQZSVFAlNM3uXZg"))).
+                    setFeatures(new 
BrokerRegistrationRequestData.FeatureCollection(
+                        Set.of(new BrokerRegistrationRequestData.Feature().
+                            setName(MetadataVersion.FEATURE_NAME).
+                            
setMinSupportedVersion(MetadataVersion.MINIMUM_VERSION.featureLevel()).
+                            
setMaxSupportedVersion(MetadataVersion.LATEST_PRODUCTION.featureLevel())).iterator())).
+                    
setIncarnationId(Uuid.fromString("0H4fUu1xQEKXFYwB1aBjhg")),
+                101L,
+                finalizedFeatures,
+                false));
+        // new registration should succeed immediatelly only if the broker is 
in controlled shutdown, 
+        // even if the last heartbeat was within the session timeout 
+        clusterControl.registerBroker(new BrokerRegistrationRequestData().
+                setClusterId("pjvUwj3ZTEeSVQmUiH3IJw").
+                setBrokerId(1).
+                setLogDirs(List.of(Uuid.fromString("b66ybsWIQoygs01vdjH07A"))).
+                setFeatures(new 
BrokerRegistrationRequestData.FeatureCollection(
+                    Set.of(new BrokerRegistrationRequestData.Feature().
+                        setName(MetadataVersion.FEATURE_NAME).
+                        
setMinSupportedVersion(MetadataVersion.MINIMUM_VERSION.featureLevel()).
+                        
setMaxSupportedVersion(MetadataVersion.LATEST_PRODUCTION.featureLevel())).iterator())).
+                setIncarnationId(Uuid.fromString("vZKYST0pSA2HO5x_6hoO2Q")),
+            201L,
+            finalizedFeatures,
+            false);
+    }
+
     private FeatureControlManager createFeatureControlManager() {
         FeatureControlManager featureControlManager = new 
FeatureControlManager.Builder().build();
         featureControlManager.replay(new FeatureLevelRecord().

Reply via email to