This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new 034367198b9 [fix][broker] Fail fast if the extensible load manager 
failed to start (#23297)
034367198b9 is described below

commit 034367198b9716838d5ed82354b6a42b9428c867
Author: Yunze Xu <xyzinfern...@163.com>
AuthorDate: Fri Sep 13 11:14:08 2024 +0800

    [fix][broker] Fail fast if the extensible load manager failed to start 
(#23297)
    
    (cherry picked from commit fc60ec06ae98fa4000473a636bfb06729c210048)
---
 .../pulsar/broker/PulsarServerException.java       |  17 +++
 .../org/apache/pulsar/broker/PulsarService.java    |   2 +-
 .../extensions/ExtensibleLoadManagerImpl.java      | 122 +++++++--------------
 .../extensions/LoadManagerFailFastTest.java        | 120 ++++++++++++++++++++
 4 files changed, 179 insertions(+), 82 deletions(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/PulsarServerException.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/PulsarServerException.java
index 2235b9a7128..d7c0d0adb3a 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/PulsarServerException.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/PulsarServerException.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.broker;
 
 import java.io.IOException;
+import java.util.concurrent.CompletionException;
 
 public class PulsarServerException extends IOException {
     private static final long serialVersionUID = 1;
@@ -44,4 +45,20 @@ public class PulsarServerException extends IOException {
             super(t);
         }
     }
+
+    public static PulsarServerException from(Throwable throwable) {
+        if (throwable instanceof CompletionException) {
+            return from(throwable.getCause());
+        }
+        if (throwable instanceof PulsarServerException pulsarServerException) {
+            return pulsarServerException;
+        } else {
+            return new PulsarServerException(throwable);
+        }
+    }
+
+    // Wrap this checked exception into a specific unchecked exception
+    public static CompletionException 
toUncheckedException(PulsarServerException e) {
+        return new CompletionException(e);
+    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 1c4d3b168b6..392e536e39d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -1001,7 +1001,7 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
             state = State.Started;
         } catch (Exception e) {
             LOG.error("Failed to start Pulsar service: {}", e.getMessage(), e);
-            PulsarServerException startException = new 
PulsarServerException(e);
+            PulsarServerException startException = 
PulsarServerException.from(e);
             
readyForIncomingRequestsFuture.completeExceptionally(startException);
             throw startException;
         } finally {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
index 40efa6390a7..8e34f2f697f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
@@ -81,7 +81,6 @@ import 
org.apache.pulsar.broker.loadbalance.extensions.scheduler.LoadManagerSche
 import 
org.apache.pulsar.broker.loadbalance.extensions.scheduler.SplitScheduler;
 import 
org.apache.pulsar.broker.loadbalance.extensions.scheduler.UnloadScheduler;
 import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStore;
-import 
org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStoreException;
 import 
org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStoreFactory;
 import 
org.apache.pulsar.broker.loadbalance.extensions.strategy.BrokerSelectionStrategy;
 import 
org.apache.pulsar.broker.loadbalance.extensions.strategy.BrokerSelectionStrategyFactory;
@@ -99,10 +98,7 @@ import org.apache.pulsar.common.naming.ServiceUnitId;
 import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.stats.Metrics;
-import org.apache.pulsar.common.util.Backoff;
-import org.apache.pulsar.common.util.BackoffBuilder;
 import org.apache.pulsar.common.util.FutureUtil;
-import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.metadata.api.coordination.LeaderElectionState;
 import org.slf4j.Logger;
 
@@ -125,10 +121,6 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager, BrokerS
 
     public static final long COMPACTION_THRESHOLD = 5 * 1024 * 1024;
 
-    public static final int STARTUP_TIMEOUT_SECONDS = 30;
-
-    public static final int MAX_RETRY = 5;
-
     private static final String ELECTION_ROOT = 
"/loadbalance/extension/leader";
 
     public static final Set<String> INTERNAL_TOPICS =
@@ -212,7 +204,7 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager, BrokerS
 
     private final ConcurrentHashMap<String, 
CompletableFuture<Optional<BrokerLookupData>>>
             lookupRequests = new ConcurrentHashMap<>();
-    private final CompletableFuture<Void> initWaiter = new 
CompletableFuture<>();
+    private final CompletableFuture<Boolean> initWaiter = new 
CompletableFuture<>();
 
     /**
      * Get all the bundles that are owned by this broker.
@@ -385,7 +377,7 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager, BrokerS
             return;
         }
         try {
-            this.brokerRegistry = new BrokerRegistryImpl(pulsar);
+            this.brokerRegistry = createBrokerRegistry(pulsar);
             this.leaderElectionService = new LeaderElectionService(
                     pulsar.getCoordinationService(), pulsar.getBrokerId(),
                     pulsar.getSafeWebServiceAddress(), ELECTION_ROOT,
@@ -400,53 +392,14 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager, BrokerS
                             });
                         });
                     });
-            this.serviceUnitStateChannel = new 
ServiceUnitStateChannelImpl(pulsar);
+            this.serviceUnitStateChannel = 
createServiceUnitStateChannel(pulsar);
             this.brokerRegistry.start();
             this.splitManager = new SplitManager(splitCounter);
             this.unloadManager = new UnloadManager(unloadCounter, 
pulsar.getBrokerId());
             this.serviceUnitStateChannel.listen(unloadManager);
             this.serviceUnitStateChannel.listen(splitManager);
             this.leaderElectionService.start();
-            pulsar.runWhenReadyForIncomingRequests(() -> {
-                Backoff backoff = new BackoffBuilder()
-                        .setInitialTime(100, TimeUnit.MILLISECONDS)
-                        .setMax(STARTUP_TIMEOUT_SECONDS, TimeUnit.SECONDS)
-                        .create();
-                int retry = 0;
-                while (!Thread.currentThread().isInterrupted()) {
-                    try {
-                        brokerRegistry.register();
-                        this.serviceUnitStateChannel.start();
-                        break;
-                    } catch (Exception e) {
-                        log.warn("The broker:{} failed to start service unit 
state channel. Retrying {} th ...",
-                                pulsar.getBrokerId(), ++retry, e);
-                        try {
-                            Thread.sleep(backoff.next());
-                        } catch (InterruptedException ex) {
-                            log.warn("Interrupted while sleeping.");
-                            // preserve thread's interrupt status
-                            Thread.currentThread().interrupt();
-                            try {
-                                pulsar.close();
-                            } catch (PulsarServerException exc) {
-                                log.error("Failed to close pulsar service.", 
exc);
-                            }
-                            return;
-                        }
-                        failStarting(e);
-                        if (retry >= MAX_RETRY) {
-                            log.error("Failed to start the service unit state 
channel after retry {} th. "
-                                    + "Closing pulsar service.", retry, e);
-                            try {
-                                pulsar.close();
-                            } catch (PulsarServerException ex) {
-                                log.error("Failed to close pulsar service.", 
ex);
-                            }
-                        }
-                    }
-                }
-            });
+
             this.antiAffinityGroupPolicyHelper =
                     new AntiAffinityGroupPolicyHelper(pulsar, 
serviceUnitStateChannel);
             antiAffinityGroupPolicyHelper.listenFailureDomainUpdate();
@@ -455,15 +408,10 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager, BrokerS
             SimpleResourceAllocationPolicies policies = new 
SimpleResourceAllocationPolicies(pulsar);
             this.isolationPoliciesHelper = new 
IsolationPoliciesHelper(policies);
             this.brokerFilterPipeline.add(new 
BrokerIsolationPoliciesFilter(isolationPoliciesHelper));
-
-            try {
-                this.brokerLoadDataStore = LoadDataStoreFactory
-                        .create(pulsar, BROKER_LOAD_DATA_STORE_TOPIC, 
BrokerLoadData.class);
-                this.topBundlesLoadDataStore = LoadDataStoreFactory
-                        .create(pulsar, TOP_BUNDLES_LOAD_DATA_STORE_TOPIC, 
TopBundlesLoadData.class);
-            } catch (LoadDataStoreException e) {
-                throw new PulsarServerException(e);
-            }
+            this.brokerLoadDataStore = LoadDataStoreFactory
+                    .create(pulsar, BROKER_LOAD_DATA_STORE_TOPIC, 
BrokerLoadData.class);
+            this.topBundlesLoadDataStore = LoadDataStoreFactory
+                    .create(pulsar, TOP_BUNDLES_LOAD_DATA_STORE_TOPIC, 
TopBundlesLoadData.class);
 
             this.context = LoadManagerContextImpl.builder()
                     .configuration(conf)
@@ -487,6 +435,7 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager, BrokerS
 
             pulsar.runWhenReadyForIncomingRequests(() -> {
                 try {
+                    this.serviceUnitStateChannel.start();
                     var interval = 
conf.getLoadBalancerReportUpdateMinIntervalMillis();
 
                     this.brokerLoadDataReportTask = 
this.pulsar.getLoadManagerExecutor()
@@ -521,38 +470,33 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager, BrokerS
                                     MONITOR_INTERVAL_IN_MILLIS, 
TimeUnit.MILLISECONDS);
 
                     this.splitScheduler.start();
-                    this.initWaiter.complete(null);
+                    this.initWaiter.complete(true);
                     this.started = true;
                     log.info("Started load manager.");
-                } catch (Exception ex) {
-                    failStarting(ex);
+                } catch (Throwable e) {
+                    failStarting(e);
                 }
             });
-        } catch (Exception ex) {
+        } catch (Throwable ex) {
             failStarting(ex);
         }
     }
 
-    private void failStarting(Exception ex) {
-        log.error("Failed to start the extensible load balance and close 
broker registry {}.",
-                this.brokerRegistry, ex);
+    private void failStarting(Throwable throwable) {
         if (this.brokerRegistry != null) {
             try {
-                brokerRegistry.unregister();
-            } catch (MetadataStoreException e) {
-                // ignore
-            }
-        }
-        if (this.serviceUnitStateChannel != null) {
-            try {
-                serviceUnitStateChannel.close();
-            } catch (IOException e) {
-                // ignore
+                brokerRegistry.close();
+            } catch (PulsarServerException e) {
+                // If close failed, this broker might still exist in the 
metadata store. Then it could be found by other
+                // brokers as an available broker. Hence, print a warning log 
for it.
+                log.warn("Failed to close the broker registry: {}", 
e.getMessage());
             }
         }
-        initWaiter.completeExceptionally(ex);
+        initWaiter.complete(false); // exit the background thread gracefully
+        throw 
PulsarServerException.toUncheckedException(PulsarServerException.from(throwable));
     }
 
+
     @Override
     public void initialize(PulsarService pulsar) {
         this.pulsar = pulsar;
@@ -897,7 +841,9 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager, BrokerS
         boolean becameFollower = false;
         while (!Thread.currentThread().isInterrupted()) {
             try {
-                initWaiter.get();
+                if (!initWaiter.get()) {
+                    return;
+                }
                 if (!serviceUnitStateChannel.isChannelOwner()) {
                     becameFollower = true;
                     break;
@@ -947,7 +893,9 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager, BrokerS
         boolean becameLeader = false;
         while (!Thread.currentThread().isInterrupted()) {
             try {
-                initWaiter.get();
+                if (!initWaiter.get()) {
+                    return;
+                }
                 if (serviceUnitStateChannel.isChannelOwner()) {
                     becameLeader = true;
                     break;
@@ -1018,7 +966,9 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager, BrokerS
     @VisibleForTesting
     protected void monitor() {
         try {
-            initWaiter.get();
+            if (!initWaiter.get()) {
+                return;
+            }
 
             // Monitor role
             // Periodically check the role in case ZK watcher fails.
@@ -1073,4 +1023,14 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager, BrokerS
             log.warn("Failed to wait for closing internal topics", e);
         }
     }
+
+    @VisibleForTesting
+    protected BrokerRegistry createBrokerRegistry(PulsarService pulsar) {
+        return new BrokerRegistryImpl(pulsar);
+    }
+
+    @VisibleForTesting
+    protected ServiceUnitStateChannel 
createServiceUnitStateChannel(PulsarService pulsar) {
+        return new ServiceUnitStateChannelImpl(pulsar);
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/LoadManagerFailFastTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/LoadManagerFailFastTest.java
new file mode 100644
index 00000000000..a400bf733e5
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/LoadManagerFailFastTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions;
+
+import java.util.Optional;
+import lombok.Cleanup;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.loadbalance.LoadManager;
+import 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel;
+import 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl;
+import org.apache.pulsar.common.util.PortManager;
+import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
+import org.awaitility.Awaitility;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class LoadManagerFailFastTest {
+
+    private static final String cluster = "test";
+    private final int zkPort = PortManager.nextLockedFreePort();
+    private final LocalBookkeeperEnsemble bk = new LocalBookkeeperEnsemble(2, 
zkPort, PortManager::nextLockedFreePort);
+    private final ServiceConfiguration config = new ServiceConfiguration();
+
+    @BeforeClass
+    protected void setup() throws Exception {
+        bk.start();
+        config.setClusterName(cluster);
+        config.setAdvertisedAddress("localhost");
+        config.setBrokerServicePort(Optional.of(0));
+        config.setWebServicePort(Optional.of(0));
+        config.setMetadataStoreUrl("zk:localhost:" + zkPort);
+    }
+
+    @AfterClass
+    protected void cleanup() throws Exception {
+        bk.stop();
+    }
+
+    @Test(timeOut = 30000)
+    public void testBrokerRegistryFailure() throws Exception {
+        
config.setLoadManagerClassName(BrokerRegistryLoadManager.class.getName());
+        @Cleanup final var pulsar = new PulsarService(config);
+        try {
+            pulsar.start();
+            Assert.fail();
+        } catch (PulsarServerException e) {
+            Assert.assertNull(e.getCause());
+            Assert.assertEquals(e.getMessage(), "Cannot start BrokerRegistry");
+        }
+        
Assert.assertTrue(pulsar.getLocalMetadataStore().getChildren(LoadManager.LOADBALANCE_BROKERS_ROOT).get()
+                .isEmpty());
+    }
+
+    @Test(timeOut = 30000)
+    public void testServiceUnitStateChannelFailure() throws Exception {
+        config.setLoadManagerClassName(ChannelLoadManager.class.getName());
+        @Cleanup final var pulsar = new PulsarService(config);
+        try {
+            pulsar.start();
+            Assert.fail();
+        } catch (PulsarServerException e) {
+            Assert.assertNull(e.getCause());
+            Assert.assertEquals(e.getMessage(), "Cannot start 
ServiceUnitStateChannel");
+        }
+        Awaitility.await().untilAsserted(() -> 
Assert.assertTrue(pulsar.getLocalMetadataStore()
+                
.getChildren(LoadManager.LOADBALANCE_BROKERS_ROOT).get().isEmpty()));
+    }
+
+    private static class BrokerRegistryLoadManager extends 
ExtensibleLoadManagerImpl {
+
+        @Override
+        protected BrokerRegistry createBrokerRegistry(PulsarService pulsar) {
+            final var mockBrokerRegistry = 
Mockito.mock(BrokerRegistryImpl.class);
+            try {
+                Mockito.doThrow(new PulsarServerException("Cannot start 
BrokerRegistry")).when(mockBrokerRegistry)
+                        .start();
+            } catch (PulsarServerException e) {
+                throw new RuntimeException(e);
+            }
+            return mockBrokerRegistry;
+        }
+    }
+
+    private static class ChannelLoadManager extends ExtensibleLoadManagerImpl {
+
+        @Override
+        protected ServiceUnitStateChannel 
createServiceUnitStateChannel(PulsarService pulsar) {
+            final var channel = 
Mockito.mock(ServiceUnitStateChannelImpl.class);
+            try {
+                Mockito.doThrow(new PulsarServerException("Cannot start 
ServiceUnitStateChannel")).when(channel)
+                        .start();
+            } catch (PulsarServerException e) {
+                throw new RuntimeException(e);
+            }
+            Mockito.doAnswer(__ -> null).when(channel).listen(Mockito.any());
+            return channel;
+        }
+    }
+}

Reply via email to