This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 6e446f0b05f KAFKA-19047: Allow quickly re-registering brokers that are
in controlled shutdown (#19296)
6e446f0b05f is described below
commit 6e446f0b05fa205e60ea2a07f5997cdd7ed0ef3f
Author: Alyssa Huang <[email protected]>
AuthorDate: Tue Apr 8 13:39:04 2025 -0700
KAFKA-19047: Allow quickly re-registering brokers that are in controlled
shutdown (#19296)
Allow re-registration of brokers with active sessions if the previous
broker registration was in controlled shutdown.
Reviewers: Colin P. McCabe <[email protected]>, Reviewers: José Armando
García Sancio <[email protected]>, David Mao <[email protected]>
---
.../kafka/controller/ClusterControlManager.java | 6 +-
.../controller/ClusterControlManagerTest.java | 127 +++++++++++++++++++++
2 files changed, 130 insertions(+), 3 deletions(-)
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
index c9064f991f0..7a52e23f92c 100644
---
a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
+++
b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
@@ -353,10 +353,10 @@ public class ClusterControlManager {
if (existing != null) {
prevIncarnationId = existing.incarnationId();
storedBrokerEpoch = existing.epoch();
- if (heartbeatManager.hasValidSession(brokerId, existing.epoch())) {
+ if (heartbeatManager.hasValidSession(brokerId, existing.epoch())
&& !existing.inControlledShutdown()) {
if (!request.incarnationId().equals(prevIncarnationId)) {
- throw new DuplicateBrokerRegistrationException("Another
broker is " +
- "registered with that broker id.");
+ throw new DuplicateBrokerRegistrationException("Another
broker is registered with that broker id. If the broker " +
+ "was recently restarted this should self-resolve
once the heartbeat manager expires the broker's session.");
}
}
}
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
b/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
index 91e8d1a77f3..93a41c5d0e3 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
@@ -20,6 +20,7 @@ package org.apache.kafka.controller;
import org.apache.kafka.common.DirectoryId;
import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.DuplicateBrokerRegistrationException;
import org.apache.kafka.common.errors.InconsistentClusterIdException;
import org.apache.kafka.common.errors.InvalidRegistrationException;
import org.apache.kafka.common.errors.StaleBrokerEpochException;
@@ -70,6 +71,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
@@ -978,6 +980,131 @@ public class ClusterControlManagerTest {
contactTime(new BrokerIdAndEpoch(2, 100)));
}
+ @Test
+ public void testDuplicateBrokerRegistrationWithActiveOldBroker() {
+ // active here means brokerHeartbeatManager last recorded the broker
as unfenced and not in controlled shutdown
+ long brokerSessionTimeoutMs = 1000;
+ MockTime time = new MockTime(0L, 20L, 1000L);
+ FinalizedControllerFeatures finalizedFeatures = new
FinalizedControllerFeatures(
+ Map.of(MetadataVersion.FEATURE_NAME,
MetadataVersion.LATEST_PRODUCTION.featureLevel()), 456L);
+ ClusterControlManager clusterControl = new
ClusterControlManager.Builder().
+ setClusterId("pjvUwj3ZTEeSVQmUiH3IJw").
+ setFeatureControlManager(createFeatureControlManager()).
+ setBrokerShutdownHandler((brokerId, isCleanShutdown, records) -> {
}).
+
setSessionTimeoutNs(TimeUnit.MILLISECONDS.toNanos(brokerSessionTimeoutMs)).
+ setTime(time).
+ build();
+ clusterControl.replay(new RegisterBrokerRecord().
+ setBrokerEpoch(100).
+ setBrokerId(0).
+ setLogDirs(List.of(Uuid.fromString("Mj3CW3OSRi29cFeNJlXuAQ"))).
+ setFenced(false), 10002);
+ clusterControl.activate();
+ assertEquals(OptionalLong.of(1000L),
clusterControl.heartbeatManager().tracker().
+ contactTime(new BrokerIdAndEpoch(0, 100)));
+
+ // while session is still valid for old broker, duplicate requests
should fail
+ time.sleep(brokerSessionTimeoutMs / 2);
+ assertThrows(DuplicateBrokerRegistrationException.class, () ->
+ clusterControl.registerBroker(new BrokerRegistrationRequestData().
+ setClusterId("pjvUwj3ZTEeSVQmUiH3IJw").
+ setBrokerId(0).
+
setLogDirs(List.of(Uuid.fromString("TyNK6XSSQJaJc2q9uflNHg"))).
+ setFeatures(new
BrokerRegistrationRequestData.FeatureCollection(
+ Set.of(new BrokerRegistrationRequestData.Feature().
+ setName(MetadataVersion.FEATURE_NAME).
+
setMinSupportedVersion(MetadataVersion.MINIMUM_VERSION.featureLevel()).
+
setMaxSupportedVersion(MetadataVersion.LATEST_PRODUCTION.featureLevel())).iterator())).
+ setIncarnationId(Uuid.fromString("0H4fUu1xQEKXFYwB1aBjhg")),
+ 101L,
+ finalizedFeatures,
+ false));
+
+ // if session expires for broker, even if the broker was active the
new registration will succeed
+ time.sleep(brokerSessionTimeoutMs);
+ clusterControl.registerBroker(new BrokerRegistrationRequestData().
+ setClusterId("pjvUwj3ZTEeSVQmUiH3IJw").
+ setBrokerId(0).
+ setLogDirs(List.of(Uuid.fromString("TyNK6XSSQJaJc2q9uflNHg"))).
+ setFeatures(new
BrokerRegistrationRequestData.FeatureCollection(
+ Set.of(new BrokerRegistrationRequestData.Feature().
+ setName(MetadataVersion.FEATURE_NAME).
+
setMinSupportedVersion(MetadataVersion.MINIMUM_VERSION.featureLevel()).
+
setMaxSupportedVersion(MetadataVersion.LATEST_PRODUCTION.featureLevel())).iterator())).
+ setIncarnationId(Uuid.fromString("0H4fUu1xQEKXFYwB1aBjhg")),
+ 101L,
+ finalizedFeatures,
+ false);
+ }
+
+ @Test
+ public void testDuplicateBrokerRegistrationWithInactiveBroker() {
+ // inactive here means brokerHeartbeatManager last recorded the broker
as fenced or in controlled shutdown
+ long brokerSessionTimeoutMs = 1000;
+ MockTime time = new MockTime(0L, 20L, 1000L);
+ FinalizedControllerFeatures finalizedFeatures = new
FinalizedControllerFeatures(
+ Map.of(MetadataVersion.FEATURE_NAME,
MetadataVersion.LATEST_PRODUCTION.featureLevel()), 456L);
+ ClusterControlManager clusterControl = new
ClusterControlManager.Builder().
+ setClusterId("pjvUwj3ZTEeSVQmUiH3IJw").
+ setFeatureControlManager(createFeatureControlManager()).
+ setBrokerShutdownHandler((brokerId, isCleanShutdown, records) -> {
}).
+
setSessionTimeoutNs(TimeUnit.MILLISECONDS.toNanos(brokerSessionTimeoutMs)).
+ setTime(time).
+ build();
+ // first broker is fenced
+ clusterControl.replay(new RegisterBrokerRecord().
+ setBrokerEpoch(100).
+ setBrokerId(0).
+ setLogDirs(List.of(Uuid.fromString("Mj3CW3OSRi29cFeNJlXuAQ"))).
+ setFenced(true).
+ setInControlledShutdown(false), 10002);
+ // second broker is in controlled shutdown
+ clusterControl.replay(new RegisterBrokerRecord().
+ setBrokerEpoch(200).
+ setBrokerId(1).
+ setLogDirs(List.of(Uuid.fromString("TyNK6XSSQJaJc2q9uflNHg"))).
+ setFenced(false).
+ setInControlledShutdown(true), 20002);
+ clusterControl.activate();
+
clusterControl.heartbeatManager().maybeUpdateControlledShutdownOffset(1, 20002);
+
+ assertEquals(OptionalLong.of(1000L),
clusterControl.heartbeatManager().tracker().
+ contactTime(new BrokerIdAndEpoch(0, 100)));
+ assertEquals(OptionalLong.of(1000L),
clusterControl.heartbeatManager().tracker().
+ contactTime(new BrokerIdAndEpoch(1, 200)));
+
+ time.sleep(brokerSessionTimeoutMs / 2);
+ assertThrows(DuplicateBrokerRegistrationException.class, () ->
+ clusterControl.registerBroker(new BrokerRegistrationRequestData().
+ setClusterId("pjvUwj3ZTEeSVQmUiH3IJw").
+ setBrokerId(0).
+
setLogDirs(List.of(Uuid.fromString("yJGxmjfbQZSVFAlNM3uXZg"))).
+ setFeatures(new
BrokerRegistrationRequestData.FeatureCollection(
+ Set.of(new BrokerRegistrationRequestData.Feature().
+ setName(MetadataVersion.FEATURE_NAME).
+
setMinSupportedVersion(MetadataVersion.MINIMUM_VERSION.featureLevel()).
+
setMaxSupportedVersion(MetadataVersion.LATEST_PRODUCTION.featureLevel())).iterator())).
+
setIncarnationId(Uuid.fromString("0H4fUu1xQEKXFYwB1aBjhg")),
+ 101L,
+ finalizedFeatures,
+ false));
+ // new registration should succeed immediatelly only if the broker is
in controlled shutdown,
+ // even if the last heartbeat was within the session timeout
+ clusterControl.registerBroker(new BrokerRegistrationRequestData().
+ setClusterId("pjvUwj3ZTEeSVQmUiH3IJw").
+ setBrokerId(1).
+ setLogDirs(List.of(Uuid.fromString("b66ybsWIQoygs01vdjH07A"))).
+ setFeatures(new
BrokerRegistrationRequestData.FeatureCollection(
+ Set.of(new BrokerRegistrationRequestData.Feature().
+ setName(MetadataVersion.FEATURE_NAME).
+
setMinSupportedVersion(MetadataVersion.MINIMUM_VERSION.featureLevel()).
+
setMaxSupportedVersion(MetadataVersion.LATEST_PRODUCTION.featureLevel())).iterator())).
+ setIncarnationId(Uuid.fromString("vZKYST0pSA2HO5x_6hoO2Q")),
+ 201L,
+ finalizedFeatures,
+ false);
+ }
+
private FeatureControlManager createFeatureControlManager() {
FeatureControlManager featureControlManager = new
FeatureControlManager.Builder().build();
featureControlManager.replay(new FeatureLevelRecord().