This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 95c1dab5d15 [fix][broker] ExtensibleLoadManager: handle
SessionReestablished and Reconnected events to re-register broker metadata
(#24932)
95c1dab5d15 is described below
commit 95c1dab5d15bc5cc4ba908f599c77ded892c34b2
Author: Dream95 <[email protected]>
AuthorDate: Mon Nov 3 21:00:31 2025 +0800
[fix][broker] ExtensibleLoadManager: handle SessionReestablished and
Reconnected events to re-register broker metadata (#24932)
Signed-off-by: Dream95 <[email protected]>
---
.../broker/loadbalance/extensions/BrokerRegistryImpl.java | 14 ++++++++++++++
.../apache/pulsar/broker/service/ZkSessionExpireTest.java | 4 ++--
2 files changed, 16 insertions(+), 2 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
index 115bbe56ffa..58559131af8 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
@@ -45,6 +45,7 @@ import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.NotificationType;
import org.apache.pulsar.metadata.api.extended.CreateOption;
+import org.apache.pulsar.metadata.api.extended.SessionEvent;
/**
* The broker registry impl, base on the LockManager.
@@ -112,6 +113,7 @@ public class BrokerRegistryImpl implements BrokerRegistry {
throw new PulsarServerException("Cannot start the broker registry
in state " + state.get());
}
pulsar.getLocalMetadataStore().registerListener(this::handleMetadataStoreNotification);
+
pulsar.getLocalMetadataStore().registerSessionListener(this::handleMetadataSessionEvent);
try {
this.registerAsync().get(conf.getMetadataStoreOperationTimeoutSeconds(),
TimeUnit.SECONDS);
} catch (ExecutionException | InterruptedException | TimeoutException
e) {
@@ -282,6 +284,18 @@ public class BrokerRegistryImpl implements BrokerRegistry {
}
}
+ private void handleMetadataSessionEvent(SessionEvent event) {
+ if (!this.isStarted()) {
+ return;
+ }
+ if (log.isDebugEnabled()) {
+ log.debug("Handle metadata session event: [{}]", event);
+ }
+ if (event == SessionEvent.SessionReestablished || event ==
SessionEvent.Reconnected) {
+ this.registerAsyncWithRetries();
+ }
+ }
+
@VisibleForTesting
protected static boolean isVerifiedNotification(Notification t) {
return t.getPath().startsWith(LOADBALANCE_BROKERS_ROOT + "/")
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ZkSessionExpireTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ZkSessionExpireTest.java
index f9632625e68..e19fd09d062 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ZkSessionExpireTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ZkSessionExpireTest.java
@@ -202,11 +202,11 @@ public class ZkSessionExpireTest extends
NetworkErrorTestBase {
metadataZKProxy.unRejectAllConnections();
Awaitility.await().untilAsserted(() -> {
Set<String> availableBrokers1 = getAvailableBrokers(pulsar1);
- Set<String> availableBrokers2 = getAvailableBrokers(pulsar1);
+ Set<String> availableBrokers2 = getAvailableBrokers(pulsar2);
log.info("Available brokers 1: {}", availableBrokers1);
log.info("Available brokers 2: {}", availableBrokers2);
assertEquals(availableBrokers1.size(), 2);
- assertEquals(availableBrokers1.size(), 2);
+ assertEquals(availableBrokers2.size(), 2);
});
// Verify: the topic on broker-1 will be unloaded.