This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.1 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 2dc4eecac77517f436eb727e380885bf27a2e6ad Author: Dream95 <[email protected]> AuthorDate: Mon Nov 3 21:00:31 2025 +0800 [fix][broker] ExtensibleLoadManager: handle SessionReestablished and Reconnected events to re-register broker metadata (#24932) Signed-off-by: Dream95 <[email protected]> (cherry picked from commit 95c1dab5d15bc5cc4ba908f599c77ded892c34b2) --- .../broker/loadbalance/extensions/BrokerRegistryImpl.java | 14 ++++++++++++++ .../apache/pulsar/broker/service/ZkSessionExpireTest.java | 4 ++-- 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java index 115bbe56ffa..58559131af8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java @@ -45,6 +45,7 @@ import org.apache.pulsar.metadata.api.MetadataStoreException; import org.apache.pulsar.metadata.api.Notification; import org.apache.pulsar.metadata.api.NotificationType; import org.apache.pulsar.metadata.api.extended.CreateOption; +import org.apache.pulsar.metadata.api.extended.SessionEvent; /** * The broker registry impl, base on the LockManager. @@ -112,6 +113,7 @@ public class BrokerRegistryImpl implements BrokerRegistry { throw new PulsarServerException("Cannot start the broker registry in state " + state.get()); } pulsar.getLocalMetadataStore().registerListener(this::handleMetadataStoreNotification); + pulsar.getLocalMetadataStore().registerSessionListener(this::handleMetadataSessionEvent); try { this.registerAsync().get(conf.getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS); } catch (ExecutionException | InterruptedException | TimeoutException e) { @@ -282,6 +284,18 @@ public class BrokerRegistryImpl implements BrokerRegistry { } } + private void handleMetadataSessionEvent(SessionEvent event) { + if (!this.isStarted()) { + return; + } + if (log.isDebugEnabled()) { + log.debug("Handle metadata session event: [{}]", event); + } + if (event == SessionEvent.SessionReestablished || event == SessionEvent.Reconnected) { + this.registerAsyncWithRetries(); + } + } + @VisibleForTesting protected static boolean isVerifiedNotification(Notification t) { return t.getPath().startsWith(LOADBALANCE_BROKERS_ROOT + "/") diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ZkSessionExpireTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ZkSessionExpireTest.java index f9632625e68..e19fd09d062 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ZkSessionExpireTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ZkSessionExpireTest.java @@ -202,11 +202,11 @@ public class ZkSessionExpireTest extends NetworkErrorTestBase { metadataZKProxy.unRejectAllConnections(); Awaitility.await().untilAsserted(() -> { Set<String> availableBrokers1 = getAvailableBrokers(pulsar1); - Set<String> availableBrokers2 = getAvailableBrokers(pulsar1); + Set<String> availableBrokers2 = getAvailableBrokers(pulsar2); log.info("Available brokers 1: {}", availableBrokers1); log.info("Available brokers 2: {}", availableBrokers2); assertEquals(availableBrokers1.size(), 2); - assertEquals(availableBrokers1.size(), 2); + assertEquals(availableBrokers2.size(), 2); }); // Verify: the topic on broker-1 will be unloaded.
