This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 53df683b0f7 [fix][broker] Ensure that PulsarService is ready for 
serving incoming requests (#22977)
53df683b0f7 is described below

commit 53df683b0f78f5f7c12f87e6fbb4d73637ca5bd5
Author: Lari Hotari <lhot...@users.noreply.github.com>
AuthorDate: Wed Jun 26 17:54:19 2024 +0300

    [fix][broker] Ensure that PulsarService is ready for serving incoming 
requests (#22977)
---
 .../org/apache/pulsar/broker/PulsarService.java    |  16 ++-
 .../extensions/ExtensibleLoadManagerImpl.java      | 131 ++++++++++++---------
 .../pulsar/broker/namespace/NamespaceService.java  |   4 +-
 .../broker/service/PulsarChannelInitializer.java   |   7 +-
 .../apache/pulsar/broker/service/ServerCnx.java    |   4 +
 .../org/apache/pulsar/broker/web/WebService.java   |  50 ++++++++
 6 files changed, 156 insertions(+), 56 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 4fa773dace9..0d8bc571c57 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -291,6 +291,7 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
     private final ExecutorProvider transactionExecutorProvider;
     private final DefaultMonotonicSnapshotClock monotonicSnapshotClock;
     private String brokerId;
+    private final CompletableFuture<Void> readyForIncomingRequestsFuture = new 
CompletableFuture<>();
 
     public enum State {
         Init, Started, Closing, Closed
@@ -999,6 +1000,9 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
 
             this.metricsGenerator = new MetricsGenerator(this);
 
+            // the broker is ready to accept incoming requests by Pulsar 
binary protocol and http/https
+            readyForIncomingRequestsFuture.complete(null);
+
             // Initialize the message protocol handlers.
             // start the protocol handlers only after the broker is ready,
             // so that the protocol handlers can access broker service 
properly.
@@ -1047,12 +1051,22 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
             state = State.Started;
         } catch (Exception e) {
             LOG.error("Failed to start Pulsar service: {}", e.getMessage(), e);
-            throw new PulsarServerException(e);
+            PulsarServerException startException = new 
PulsarServerException(e);
+            
readyForIncomingRequestsFuture.completeExceptionally(startException);
+            throw startException;
         } finally {
             mutex.unlock();
         }
     }
 
+    public void runWhenReadyForIncomingRequests(Runnable runnable) {
+        readyForIncomingRequestsFuture.thenRun(runnable);
+    }
+
+    public void waitUntilReadyForIncomingRequests() throws ExecutionException, 
InterruptedException {
+        readyForIncomingRequestsFuture.get();
+    }
+
     protected BrokerInterceptor newBrokerInterceptor() throws IOException {
         return BrokerInterceptors.load(config);
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
index 92dcf8001ad..4a7ba90aad9 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
@@ -36,7 +36,6 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
@@ -167,10 +166,10 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager, BrokerS
 
     private TopBundleLoadDataReporter topBundleLoadDataReporter;
 
-    private ScheduledFuture brokerLoadDataReportTask;
-    private ScheduledFuture topBundlesLoadDataReportTask;
+    private volatile ScheduledFuture brokerLoadDataReportTask;
+    private volatile ScheduledFuture topBundlesLoadDataReportTask;
 
-    private ScheduledFuture monitorTask;
+    private volatile ScheduledFuture monitorTask;
     private SplitScheduler splitScheduler;
 
     private UnloadManager unloadManager;
@@ -199,7 +198,7 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager, BrokerS
 
     private final ConcurrentHashMap<String, 
CompletableFuture<Optional<BrokerLookupData>>>
             lookupRequests = new ConcurrentHashMap<>();
-    private final CountDownLatch initWaiter = new CountDownLatch(1);
+    private final CompletableFuture<Void> initWaiter = new 
CompletableFuture<>();
 
     /**
      * Get all the bundles that are owned by this broker.
@@ -376,12 +375,14 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager, BrokerS
                     pulsar.getCoordinationService(), pulsar.getBrokerId(),
                     pulsar.getSafeWebServiceAddress(), ELECTION_ROOT,
                     state -> {
-                        pulsar.getLoadManagerExecutor().execute(() -> {
-                            if (state == LeaderElectionState.Leading) {
-                                playLeader();
-                            } else {
-                                playFollower();
-                            }
+                        pulsar.runWhenReadyForIncomingRequests(() -> {
+                            pulsar.getLoadManagerExecutor().execute(() -> {
+                                if (state == LeaderElectionState.Leading) {
+                                    playLeader();
+                                } else {
+                                    playFollower();
+                                }
+                            });
                         });
                     });
             this.serviceUnitStateChannel = new 
ServiceUnitStateChannelImpl(pulsar);
@@ -391,7 +392,13 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager, BrokerS
             this.serviceUnitStateChannel.listen(unloadManager);
             this.serviceUnitStateChannel.listen(splitManager);
             this.leaderElectionService.start();
-            this.serviceUnitStateChannel.start();
+            pulsar.runWhenReadyForIncomingRequests(() -> {
+                try {
+                    this.serviceUnitStateChannel.start();
+                } catch (Exception e) {
+                    failStarting(e);
+                }
+            });
             this.antiAffinityGroupPolicyHelper =
                     new AntiAffinityGroupPolicyHelper(pulsar, 
serviceUnitStateChannel);
             antiAffinityGroupPolicyHelper.listenFailureDomainUpdate();
@@ -423,54 +430,72 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager, BrokerS
                     new TopBundleLoadDataReporter(pulsar, 
brokerRegistry.getBrokerId(), topBundlesLoadDataStore);
             this.serviceUnitStateChannel.listen(brokerLoadDataReporter);
             this.serviceUnitStateChannel.listen(topBundleLoadDataReporter);
-            var interval = conf.getLoadBalancerReportUpdateMinIntervalMillis();
-            this.brokerLoadDataReportTask = 
this.pulsar.getLoadManagerExecutor()
-                    .scheduleAtFixedRate(() -> {
-                                try {
-                                    brokerLoadDataReporter.reportAsync(false);
-                                    // TODO: update broker load metrics using 
getLocalData
-                                } catch (Throwable e) {
-                                    log.error("Failed to run the broker load 
manager executor job.", e);
-                                }
-                            },
-                            interval,
-                            interval, TimeUnit.MILLISECONDS);
-
-            this.topBundlesLoadDataReportTask = 
this.pulsar.getLoadManagerExecutor()
-                    .scheduleAtFixedRate(() -> {
-                                try {
-                                    // TODO: consider excluding the bundles 
that are in the process of split.
-                                    
topBundleLoadDataReporter.reportAsync(false);
-                                } catch (Throwable e) {
-                                    log.error("Failed to run the top bundles 
load manager executor job.", e);
-                                }
-                            },
-                            interval,
-                            interval, TimeUnit.MILLISECONDS);
-
-            this.monitorTask = this.pulsar.getLoadManagerExecutor()
-                    .scheduleAtFixedRate(() -> {
-                                monitor();
-                            },
-                            MONITOR_INTERVAL_IN_MILLIS,
-                            MONITOR_INTERVAL_IN_MILLIS, TimeUnit.MILLISECONDS);
 
             this.unloadScheduler = new UnloadScheduler(
                     pulsar, pulsar.getLoadManagerExecutor(), unloadManager, 
context,
                     serviceUnitStateChannel, unloadCounter, unloadMetrics);
             this.splitScheduler = new SplitScheduler(
                     pulsar, serviceUnitStateChannel, splitManager, 
splitCounter, splitMetrics, context);
-            this.splitScheduler.start();
-            this.initWaiter.countDown();
-            this.started = true;
-            log.info("Started load manager.");
+
+            pulsar.runWhenReadyForIncomingRequests(() -> {
+                try {
+                    var interval = 
conf.getLoadBalancerReportUpdateMinIntervalMillis();
+
+                    this.brokerLoadDataReportTask = 
this.pulsar.getLoadManagerExecutor()
+                            .scheduleAtFixedRate(() -> {
+                                        try {
+                                            
brokerLoadDataReporter.reportAsync(false);
+                                            // TODO: update broker load 
metrics using getLocalData
+                                        } catch (Throwable e) {
+                                            log.error("Failed to run the 
broker load manager executor job.", e);
+                                        }
+                                    },
+                                    interval,
+                                    interval, TimeUnit.MILLISECONDS);
+
+                    this.topBundlesLoadDataReportTask = 
this.pulsar.getLoadManagerExecutor()
+                            .scheduleAtFixedRate(() -> {
+                                        try {
+                                            // TODO: consider excluding the 
bundles that are in the process of split.
+                                            
topBundleLoadDataReporter.reportAsync(false);
+                                        } catch (Throwable e) {
+                                            log.error("Failed to run the top 
bundles load manager executor job.", e);
+                                        }
+                                    },
+                                    interval,
+                                    interval, TimeUnit.MILLISECONDS);
+
+                    this.monitorTask = this.pulsar.getLoadManagerExecutor()
+                            .scheduleAtFixedRate(() -> {
+                                        monitor();
+                                    },
+                                    MONITOR_INTERVAL_IN_MILLIS,
+                                    MONITOR_INTERVAL_IN_MILLIS, 
TimeUnit.MILLISECONDS);
+
+                    this.splitScheduler.start();
+                    this.initWaiter.complete(null);
+                    this.started = true;
+                    log.info("Started load manager.");
+                } catch (Exception ex) {
+                    failStarting(ex);
+                }
+            });
         } catch (Exception ex) {
-            log.error("Failed to start the extensible load balance and close 
broker registry {}.",
-                    this.brokerRegistry, ex);
-            if (this.brokerRegistry != null) {
+            failStarting(ex);
+        }
+    }
+
+    private void failStarting(Exception ex) {
+        log.error("Failed to start the extensible load balance and close 
broker registry {}.",
+                this.brokerRegistry, ex);
+        if (this.brokerRegistry != null) {
+            try {
                 brokerRegistry.close();
+            } catch (PulsarServerException e) {
+                // ignore
             }
         }
+        initWaiter.completeExceptionally(ex);
     }
 
     @Override
@@ -816,7 +841,7 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager, BrokerS
         boolean becameFollower = false;
         while (!Thread.currentThread().isInterrupted()) {
             try {
-                initWaiter.await();
+                initWaiter.get();
                 if (!serviceUnitStateChannel.isChannelOwner()) {
                     becameFollower = true;
                     break;
@@ -866,7 +891,7 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager, BrokerS
         boolean becameLeader = false;
         while (!Thread.currentThread().isInterrupted()) {
             try {
-                initWaiter.await();
+                initWaiter.get();
                 if (serviceUnitStateChannel.isChannelOwner()) {
                     becameLeader = true;
                     break;
@@ -936,7 +961,7 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager, BrokerS
     @VisibleForTesting
     protected void monitor() {
         try {
-            initWaiter.await();
+            initWaiter.get();
 
             // Monitor role
             // Periodically check the role in case ZK watcher fails.
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index df6a141ddcf..dfd03dfbc6e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -1335,7 +1335,9 @@ public class NamespaceService implements AutoCloseable {
                 bundleOwnershipListeners.add(listener);
             }
         }
-        getOwnedServiceUnits().forEach(bundle -> 
notifyNamespaceBundleOwnershipListener(bundle, listeners));
+        pulsar.runWhenReadyForIncomingRequests(() -> {
+            getOwnedServiceUnits().forEach(bundle -> 
notifyNamespaceBundleOwnershipListener(bundle, listeners));
+        });
     }
 
     public void 
addNamespaceBundleSplitListener(NamespaceBundleSplitListener... listeners) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
index 5308b3c981e..e276ea24fed 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.broker.service;
 
 import com.google.common.annotations.VisibleForTesting;
+import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
@@ -104,6 +105,9 @@ public class PulsarChannelInitializer extends 
ChannelInitializer<SocketChannel>
 
     @Override
     protected void initChannel(SocketChannel ch) throws Exception {
+        // disable auto read explicitly so that requests aren't served until 
auto read is enabled
+        // ServerCnx must enable auto read in channelActive after 
PulsarService is ready to accept incoming requests
+        ch.config().setAutoRead(false);
         ch.pipeline().addLast("consolidation", new 
FlushConsolidationHandler(1024, true));
         if (this.enableTls) {
             if (this.tlsEnabledWithKeyStore) {
@@ -128,7 +132,8 @@ public class PulsarChannelInitializer extends 
ChannelInitializer<SocketChannel>
         // ServerCnx ends up reading higher number of messages and broker can 
not throttle the messages by disabling
         // auto-read.
         ch.pipeline().addLast("flowController", new FlowControlHandler());
-        ServerCnx cnx = newServerCnx(pulsar, listenerName);
+        // using "ChannelHandler" type to workaround an IntelliJ bug that 
shows a false positive error
+        ChannelHandler cnx = newServerCnx(pulsar, listenerName);
         ch.pipeline().addLast("handler", cnx);
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index b184f794949..4933aee974d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -369,6 +369,10 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
         this.commandSender = new PulsarCommandSenderImpl(brokerInterceptor, 
this);
         this.service.getPulsarStats().recordConnectionCreate();
         cnxsPerThread.get().add(this);
+        service.getPulsar().runWhenReadyForIncomingRequests(() -> {
+            // enable auto read after PulsarService is ready to accept 
incoming requests
+            ctx.channel().config().setAutoRead(true);
+        });
     }
 
     @Override
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
index bf484d4f41f..c969f40ad43 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
@@ -20,12 +20,21 @@ package org.apache.pulsar.broker.web;
 
 import io.prometheus.client.CollectorRegistry;
 import io.prometheus.client.jetty.JettyStatisticsCollector;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.concurrent.ExecutionException;
 import javax.servlet.DispatcherType;
+import javax.servlet.Filter;
+import javax.servlet.FilterChain;
+import javax.servlet.FilterConfig;
+import javax.servlet.ServletException;
+import javax.servlet.ServletRequest;
+import javax.servlet.ServletResponse;
+import javax.servlet.http.HttpServletResponse;
 import lombok.Getter;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.PulsarService;
@@ -232,6 +241,7 @@ public class WebService implements AutoCloseable {
         private final FilterHolder authenticationFilterHolder;
         FilterInitializer(PulsarService pulsarService) {
             ServiceConfiguration config = pulsarService.getConfiguration();
+
             if (config.getMaxConcurrentHttpRequests() > 0) {
                 FilterHolder filterHolder = new FilterHolder(QoSFilter.class);
                 filterHolder.setInitParameter("maxRequests", 
String.valueOf(config.getMaxConcurrentHttpRequests()));
@@ -243,6 +253,10 @@ public class WebService implements AutoCloseable {
                         new 
RateLimitingFilter(config.getHttpRequestsMaxPerSecond())));
             }
 
+            // wait until the PulsarService is ready to serve incoming requests
+            filterHolders.add(
+                    new FilterHolder(new 
WaitUntilPulsarServiceIsReadyForIncomingRequestsFilter(pulsarService)));
+
             boolean brokerInterceptorEnabled = 
pulsarService.getBrokerInterceptor() != null;
             if (brokerInterceptorEnabled) {
                 ExceptionHandler handler = new ExceptionHandler();
@@ -284,6 +298,42 @@ public class WebService implements AutoCloseable {
             }
         }
 
+        // Filter that waits until the PulsarService is ready to serve 
incoming requests
+        private static class 
WaitUntilPulsarServiceIsReadyForIncomingRequestsFilter implements Filter {
+            private final PulsarService pulsarService;
+
+            public 
WaitUntilPulsarServiceIsReadyForIncomingRequestsFilter(PulsarService 
pulsarService) {
+                this.pulsarService = pulsarService;
+            }
+
+            @Override
+            public void init(FilterConfig filterConfig) throws 
ServletException {
+
+            }
+
+            @Override
+            public void doFilter(ServletRequest request, ServletResponse 
response, FilterChain chain)
+                    throws IOException, ServletException {
+                try {
+                    // Wait until the PulsarService is ready to serve incoming 
requests
+                    pulsarService.waitUntilReadyForIncomingRequests();
+                } catch (ExecutionException e) {
+                    ((HttpServletResponse) 
response).sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE,
+                            "PulsarService failed to start.");
+                    return;
+                } catch (InterruptedException e) {
+                    ((HttpServletResponse) 
response).sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE,
+                            "PulsarService is not ready.");
+                    return;
+                }
+                chain.doFilter(request, response);
+            }
+
+            @Override
+            public void destroy() {
+
+            }
+        }
     }
 
     public void addServlet(String path, ServletHolder servletHolder, boolean 
requiresAuthentication,

Reply via email to