This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 25feaf791ed69405ea7aa075b63d2fa42ca7ec72
Author: Yunze Xu <[email protected]>
AuthorDate: Fri Sep 27 19:49:02 2024 +0800

    [fix][broker] Fix the broker registery cannot recover from the metadata 
node deletion (#23359)
    
    (cherry picked from commit 95bd1d1dd3d447f0705a96092afbc9d6bd6cd1dc)
---
 .../loadbalance/extensions/BrokerRegistry.java     |   6 +-
 .../loadbalance/extensions/BrokerRegistryImpl.java |  57 ++++++----
 .../extensions/BrokerRegistryIntegrationTest.java  | 124 +++++++++++++++++++++
 .../loadbalance/extensions/BrokerRegistryTest.java |   6 +-
 .../extensions/ExtensibleLoadManagerImplTest.java  |   2 +-
 5 files changed, 167 insertions(+), 28 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistry.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistry.java
index 8133d4c4827..79dba9c6334 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistry.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistry.java
@@ -25,6 +25,8 @@ import java.util.concurrent.CompletableFuture;
 import java.util.function.BiConsumer;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import org.apache.pulsar.common.classification.InterfaceAudience;
+import org.apache.pulsar.common.classification.InterfaceStability;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.metadata.api.NotificationType;
 
@@ -32,6 +34,8 @@ import org.apache.pulsar.metadata.api.NotificationType;
  * Responsible for registering the current Broker lookup info to
  * the distributed store (e.g. Zookeeper) for broker discovery.
  */
[email protected]
[email protected]
 public interface BrokerRegistry extends AutoCloseable {
 
     /**
@@ -47,7 +51,7 @@ public interface BrokerRegistry extends AutoCloseable {
     /**
      * Register local broker to metadata store.
      */
-    void register() throws MetadataStoreException;
+    CompletableFuture<Void> registerAsync();
 
     /**
      * Unregister the broker.
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
index 44f6287b2e1..296d9a77fd7 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
@@ -32,6 +32,7 @@ import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiConsumer;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.PulsarServerException;
@@ -69,10 +70,12 @@ public class BrokerRegistryImpl implements BrokerRegistry {
         Init,
         Started,
         Registered,
+        Unregistering,
         Closed
     }
 
-    private State state;
+    @VisibleForTesting
+    final AtomicReference<State> state = new AtomicReference<>(State.Init);
 
     public BrokerRegistryImpl(PulsarService pulsar) {
         this.pulsar = pulsar;
@@ -80,6 +83,13 @@ public class BrokerRegistryImpl implements BrokerRegistry {
         this.brokerLookupDataMetadataCache = 
pulsar.getLocalMetadataStore().getMetadataCache(BrokerLookupData.class);
         this.scheduler = pulsar.getLoadManagerExecutor();
         this.listeners = new ArrayList<>();
+        // The registered node is an ephemeral node that could be deleted when 
the metadata store client's session
+        // is expired. In this case, we should register again.
+        this.listeners.add((broker, notificationType) -> {
+            if (notificationType == NotificationType.Deleted && 
getBrokerId().equals(broker)) {
+                registerAsync();
+            }
+        });
         this.brokerIdKeyPath = keyPath(pulsar.getBrokerId());
         this.brokerLookupData = new BrokerLookupData(
                 pulsar.getWebServiceAddress(),
@@ -93,44 +103,45 @@ public class BrokerRegistryImpl implements BrokerRegistry {
                 conf.getLoadManagerClassName(),
                 System.currentTimeMillis(),
                 pulsar.getBrokerVersion());
-        this.state = State.Init;
     }
 
     @Override
     public synchronized void start() throws PulsarServerException {
-        if (this.state != State.Init) {
-            return;
+        if (!this.state.compareAndSet(State.Init, State.Started)) {
+            throw new PulsarServerException("Cannot start the broker registry 
in state " + state.get());
         }
         
pulsar.getLocalMetadataStore().registerListener(this::handleMetadataStoreNotification);
         try {
-            this.state = State.Started;
-            this.register();
-        } catch (MetadataStoreException e) {
-            throw new PulsarServerException(e);
+            
this.registerAsync().get(conf.getMetadataStoreOperationTimeoutSeconds(), 
TimeUnit.SECONDS);
+        } catch (ExecutionException | InterruptedException | TimeoutException 
e) {
+            throw PulsarServerException.from(e);
         }
     }
 
     @Override
     public boolean isStarted() {
-        return this.state == State.Started || this.state == State.Registered;
+        final var state = this.state.get();
+        return state == State.Started || state == State.Registered;
     }
 
     @Override
-    public synchronized void register() throws MetadataStoreException {
-        if (this.state == State.Started) {
-            try {
-                brokerLookupDataMetadataCache.put(brokerIdKeyPath, 
brokerLookupData, EnumSet.of(CreateOption.Ephemeral))
-                        .get(conf.getMetadataStoreOperationTimeoutSeconds(), 
TimeUnit.SECONDS);
-                this.state = State.Registered;
-            } catch (InterruptedException | ExecutionException | 
TimeoutException e) {
-                throw MetadataStoreException.unwrap(e);
-            }
+    public CompletableFuture<Void> registerAsync() {
+        final var state = this.state.get();
+        if (state != State.Started && state != State.Registered) {
+            log.info("[{}] Skip registering self because the state is {}", 
getBrokerId(), state);
+            return CompletableFuture.completedFuture(null);
         }
+        log.info("[{}] Started registering self to {} (state: {})", 
getBrokerId(), brokerIdKeyPath, state);
+        return brokerLookupDataMetadataCache.put(brokerIdKeyPath, 
brokerLookupData, EnumSet.of(CreateOption.Ephemeral))
+                .thenAccept(__ -> {
+                    this.state.set(State.Registered);
+                    log.info("[{}] Finished registering self", getBrokerId());
+                });
     }
 
     @Override
     public synchronized void unregister() throws MetadataStoreException {
-        if (this.state == State.Registered) {
+        if (state.compareAndSet(State.Registered, State.Unregistering)) {
             try {
                 brokerLookupDataMetadataCache.delete(brokerIdKeyPath)
                         .get(conf.getMetadataStoreOperationTimeoutSeconds(), 
TimeUnit.SECONDS);
@@ -143,7 +154,7 @@ public class BrokerRegistryImpl implements BrokerRegistry {
             } catch (InterruptedException | TimeoutException e) {
                 throw MetadataStoreException.unwrap(e);
             } finally {
-                this.state = State.Started;
+                state.set(State.Started);
             }
         }
     }
@@ -190,7 +201,7 @@ public class BrokerRegistryImpl implements BrokerRegistry {
 
     @Override
     public synchronized void close() throws PulsarServerException {
-        if (this.state == State.Closed) {
+        if (this.state.get() == State.Closed) {
             return;
         }
         try {
@@ -199,7 +210,7 @@ public class BrokerRegistryImpl implements BrokerRegistry {
         } catch (Exception ex) {
             log.error("Unexpected error when unregistering the broker 
registry", ex);
         } finally {
-            this.state = State.Closed;
+            this.state.set(State.Closed);
         }
     }
 
@@ -237,7 +248,7 @@ public class BrokerRegistryImpl implements BrokerRegistry {
     }
 
     private void checkState() throws IllegalStateException {
-        if (this.state == State.Closed) {
+        if (this.state.get() == State.Closed) {
             throw new IllegalStateException("The registry already closed.");
         }
     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryIntegrationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryIntegrationTest.java
new file mode 100644
index 00000000000..162ea50829d
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryIntegrationTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.util.PortManager;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.loadbalance.LoadManager;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
+import org.awaitility.Awaitility;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "broker")
+public class BrokerRegistryIntegrationTest {
+
+    private static final String clusterName = "test";
+    private final int zkPort = PortManager.nextFreePort();
+    private final LocalBookkeeperEnsemble bk = new LocalBookkeeperEnsemble(2, 
zkPort, PortManager::nextFreePort);
+    private PulsarService pulsar;
+    private BrokerRegistry brokerRegistry;
+    private String brokerMetadataPath;
+
+    @BeforeClass
+    protected void setup() throws Exception {
+        bk.start();
+        pulsar = new PulsarService(brokerConfig());
+        pulsar.start();
+        final var admin = pulsar.getAdminClient();
+        admin.clusters().createCluster(clusterName, 
ClusterData.builder().build());
+        admin.tenants().createTenant("public", TenantInfo.builder()
+                .allowedClusters(Collections.singleton(clusterName)).build());
+        admin.namespaces().createNamespace("public/default");
+        brokerRegistry = ((ExtensibleLoadManagerWrapper) 
pulsar.getLoadManager().get()).get().getBrokerRegistry();
+        brokerMetadataPath = LoadManager.LOADBALANCE_BROKERS_ROOT + "/" + 
pulsar.getBrokerId();
+    }
+
+    @AfterClass(alwaysRun = true)
+    protected void cleanup() throws Exception {
+        if (pulsar != null) {
+            pulsar.close();
+        }
+        bk.stop();
+    }
+
+    @Test
+    public void testRecoverFromNodeDeletion() throws Exception {
+        // Simulate the case that the node was somehow deleted (e.g. by 
session timeout)
+        Awaitility.await().atMost(Duration.ofSeconds(3)).untilAsserted(() -> 
Assert.assertEquals(
+                brokerRegistry.getAvailableBrokersAsync().join(), 
List.of(pulsar.getBrokerId())));
+        pulsar.getLocalMetadataStore().delete(brokerMetadataPath, 
Optional.empty());
+        Awaitility.await().atMost(Duration.ofSeconds(3)).untilAsserted(() -> 
Assert.assertEquals(
+                brokerRegistry.getAvailableBrokersAsync().join(), 
List.of(pulsar.getBrokerId())));
+
+        // If the node is deleted by unregister(), it should not recreate the 
path
+        brokerRegistry.unregister();
+        Thread.sleep(3000);
+        
Assert.assertTrue(brokerRegistry.getAvailableBrokersAsync().get().isEmpty());
+
+        // Restore the normal state
+        brokerRegistry.registerAsync().get();
+        Assert.assertEquals(brokerRegistry.getAvailableBrokersAsync().get(), 
List.of(pulsar.getBrokerId()));
+    }
+
+    @Test
+    public void testRegisterAgain() throws Exception {
+        Awaitility.await().atMost(Duration.ofSeconds(3)).untilAsserted(() -> 
Assert.assertEquals(
+                brokerRegistry.getAvailableBrokersAsync().join(), 
List.of(pulsar.getBrokerId())));
+        final var metadataStore = pulsar.getLocalMetadataStore();
+        final var oldResult = 
metadataStore.get(brokerMetadataPath).get().orElseThrow();
+        log.info("Old result: {} {}", new String(oldResult.getValue()), 
oldResult.getStat().getVersion());
+        brokerRegistry.registerAsync().get();
+
+        Awaitility.await().atMost(Duration.ofSeconds(3)).untilAsserted(() -> {
+            final var newResult = 
metadataStore.get(brokerMetadataPath).get().orElseThrow();
+            log.info("New result: {} {}", new String(newResult.getValue()), 
newResult.getStat().getVersion());
+            Assert.assertTrue(newResult.getStat().getVersion() > 
oldResult.getStat().getVersion());
+            Assert.assertEquals(newResult.getValue(), oldResult.getValue());
+        });
+    }
+
+    private ServiceConfiguration brokerConfig() {
+        final var config = new ServiceConfiguration();
+        config.setClusterName(clusterName);
+        config.setAdvertisedAddress("localhost");
+        config.setBrokerServicePort(Optional.of(0));
+        config.setWebServicePort(Optional.of(0));
+        config.setMetadataStoreUrl("zk:127.0.0.1:" + bk.getZookeeperPort());
+        config.setManagedLedgerDefaultWriteQuorum(1);
+        config.setManagedLedgerDefaultAckQuorum(1);
+        config.setManagedLedgerDefaultEnsembleSize(1);
+        config.setDefaultNumberOfNamespaceBundles(16);
+        
config.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());
+        config.setLoadBalancerDebugModeEnabled(true);
+        config.setBrokerShutdownTimeoutMs(100);
+        return config;
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java
index 91ada90dda6..28a2a18500f 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java
@@ -332,7 +332,7 @@ public class BrokerRegistryTest {
         assertEquals(getState(brokerRegistry), 
BrokerRegistryImpl.State.Started);
 
         // Check state after re-register.
-        brokerRegistry.register();
+        brokerRegistry.registerAsync().get();
         assertEquals(getState(brokerRegistry), 
BrokerRegistryImpl.State.Registered);
 
         // Check state after close.
@@ -396,8 +396,8 @@ public class BrokerRegistryTest {
         assertEquals(keyPath, LOADBALANCE_BROKERS_ROOT + "/brokerId");
     }
 
-    public BrokerRegistryImpl.State getState(BrokerRegistryImpl 
brokerRegistry) {
-        return WhiteboxImpl.getInternalState(brokerRegistry, 
BrokerRegistryImpl.State.class);
+    private static BrokerRegistryImpl.State getState(BrokerRegistryImpl 
brokerRegistry) {
+        return brokerRegistry.state.get();
     }
 }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
index f2d5a682510..7580c165a50 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
@@ -1181,7 +1181,7 @@ public class ExtensibleLoadManagerImplTest extends 
ExtensibleLoadManagerImplBase
                     producer.send("t1");
 
                     // Test re-register broker and check the lookup result
-                    loadManager4.getBrokerRegistry().register();
+                    loadManager4.getBrokerRegistry().registerAsync().get();
 
                     result = 
pulsar.getAdminClient().lookups().lookupTopic(slaMonitorTopic);
                     assertNotNull(result);

Reply via email to