This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 53df683b0f7 [fix][broker] Ensure that PulsarService is ready for serving incoming requests (#22977) 53df683b0f7 is described below commit 53df683b0f78f5f7c12f87e6fbb4d73637ca5bd5 Author: Lari Hotari <lhot...@users.noreply.github.com> AuthorDate: Wed Jun 26 17:54:19 2024 +0300 [fix][broker] Ensure that PulsarService is ready for serving incoming requests (#22977) --- .../org/apache/pulsar/broker/PulsarService.java | 16 ++- .../extensions/ExtensibleLoadManagerImpl.java | 131 ++++++++++++--------- .../pulsar/broker/namespace/NamespaceService.java | 4 +- .../broker/service/PulsarChannelInitializer.java | 7 +- .../apache/pulsar/broker/service/ServerCnx.java | 4 + .../org/apache/pulsar/broker/web/WebService.java | 50 ++++++++ 6 files changed, 156 insertions(+), 56 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 4fa773dace9..0d8bc571c57 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -291,6 +291,7 @@ public class PulsarService implements AutoCloseable, ShutdownService { private final ExecutorProvider transactionExecutorProvider; private final DefaultMonotonicSnapshotClock monotonicSnapshotClock; private String brokerId; + private final CompletableFuture<Void> readyForIncomingRequestsFuture = new CompletableFuture<>(); public enum State { Init, Started, Closing, Closed @@ -999,6 +1000,9 @@ public class PulsarService implements AutoCloseable, ShutdownService { this.metricsGenerator = new MetricsGenerator(this); + // the broker is ready to accept incoming requests by Pulsar binary protocol and http/https + readyForIncomingRequestsFuture.complete(null); + // Initialize the message protocol handlers. // start the protocol handlers only after the broker is ready, // so that the protocol handlers can access broker service properly. @@ -1047,12 +1051,22 @@ public class PulsarService implements AutoCloseable, ShutdownService { state = State.Started; } catch (Exception e) { LOG.error("Failed to start Pulsar service: {}", e.getMessage(), e); - throw new PulsarServerException(e); + PulsarServerException startException = new PulsarServerException(e); + readyForIncomingRequestsFuture.completeExceptionally(startException); + throw startException; } finally { mutex.unlock(); } } + public void runWhenReadyForIncomingRequests(Runnable runnable) { + readyForIncomingRequestsFuture.thenRun(runnable); + } + + public void waitUntilReadyForIncomingRequests() throws ExecutionException, InterruptedException { + readyForIncomingRequestsFuture.get(); + } + protected BrokerInterceptor newBrokerInterceptor() throws IOException { return BrokerInterceptors.load(config); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java index 92dcf8001ad..4a7ba90aad9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java @@ -36,7 +36,6 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -167,10 +166,10 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager, BrokerS private TopBundleLoadDataReporter topBundleLoadDataReporter; - private ScheduledFuture brokerLoadDataReportTask; - private ScheduledFuture topBundlesLoadDataReportTask; + private volatile ScheduledFuture brokerLoadDataReportTask; + private volatile ScheduledFuture topBundlesLoadDataReportTask; - private ScheduledFuture monitorTask; + private volatile ScheduledFuture monitorTask; private SplitScheduler splitScheduler; private UnloadManager unloadManager; @@ -199,7 +198,7 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager, BrokerS private final ConcurrentHashMap<String, CompletableFuture<Optional<BrokerLookupData>>> lookupRequests = new ConcurrentHashMap<>(); - private final CountDownLatch initWaiter = new CountDownLatch(1); + private final CompletableFuture<Void> initWaiter = new CompletableFuture<>(); /** * Get all the bundles that are owned by this broker. @@ -376,12 +375,14 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager, BrokerS pulsar.getCoordinationService(), pulsar.getBrokerId(), pulsar.getSafeWebServiceAddress(), ELECTION_ROOT, state -> { - pulsar.getLoadManagerExecutor().execute(() -> { - if (state == LeaderElectionState.Leading) { - playLeader(); - } else { - playFollower(); - } + pulsar.runWhenReadyForIncomingRequests(() -> { + pulsar.getLoadManagerExecutor().execute(() -> { + if (state == LeaderElectionState.Leading) { + playLeader(); + } else { + playFollower(); + } + }); }); }); this.serviceUnitStateChannel = new ServiceUnitStateChannelImpl(pulsar); @@ -391,7 +392,13 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager, BrokerS this.serviceUnitStateChannel.listen(unloadManager); this.serviceUnitStateChannel.listen(splitManager); this.leaderElectionService.start(); - this.serviceUnitStateChannel.start(); + pulsar.runWhenReadyForIncomingRequests(() -> { + try { + this.serviceUnitStateChannel.start(); + } catch (Exception e) { + failStarting(e); + } + }); this.antiAffinityGroupPolicyHelper = new AntiAffinityGroupPolicyHelper(pulsar, serviceUnitStateChannel); antiAffinityGroupPolicyHelper.listenFailureDomainUpdate(); @@ -423,54 +430,72 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager, BrokerS new TopBundleLoadDataReporter(pulsar, brokerRegistry.getBrokerId(), topBundlesLoadDataStore); this.serviceUnitStateChannel.listen(brokerLoadDataReporter); this.serviceUnitStateChannel.listen(topBundleLoadDataReporter); - var interval = conf.getLoadBalancerReportUpdateMinIntervalMillis(); - this.brokerLoadDataReportTask = this.pulsar.getLoadManagerExecutor() - .scheduleAtFixedRate(() -> { - try { - brokerLoadDataReporter.reportAsync(false); - // TODO: update broker load metrics using getLocalData - } catch (Throwable e) { - log.error("Failed to run the broker load manager executor job.", e); - } - }, - interval, - interval, TimeUnit.MILLISECONDS); - - this.topBundlesLoadDataReportTask = this.pulsar.getLoadManagerExecutor() - .scheduleAtFixedRate(() -> { - try { - // TODO: consider excluding the bundles that are in the process of split. - topBundleLoadDataReporter.reportAsync(false); - } catch (Throwable e) { - log.error("Failed to run the top bundles load manager executor job.", e); - } - }, - interval, - interval, TimeUnit.MILLISECONDS); - - this.monitorTask = this.pulsar.getLoadManagerExecutor() - .scheduleAtFixedRate(() -> { - monitor(); - }, - MONITOR_INTERVAL_IN_MILLIS, - MONITOR_INTERVAL_IN_MILLIS, TimeUnit.MILLISECONDS); this.unloadScheduler = new UnloadScheduler( pulsar, pulsar.getLoadManagerExecutor(), unloadManager, context, serviceUnitStateChannel, unloadCounter, unloadMetrics); this.splitScheduler = new SplitScheduler( pulsar, serviceUnitStateChannel, splitManager, splitCounter, splitMetrics, context); - this.splitScheduler.start(); - this.initWaiter.countDown(); - this.started = true; - log.info("Started load manager."); + + pulsar.runWhenReadyForIncomingRequests(() -> { + try { + var interval = conf.getLoadBalancerReportUpdateMinIntervalMillis(); + + this.brokerLoadDataReportTask = this.pulsar.getLoadManagerExecutor() + .scheduleAtFixedRate(() -> { + try { + brokerLoadDataReporter.reportAsync(false); + // TODO: update broker load metrics using getLocalData + } catch (Throwable e) { + log.error("Failed to run the broker load manager executor job.", e); + } + }, + interval, + interval, TimeUnit.MILLISECONDS); + + this.topBundlesLoadDataReportTask = this.pulsar.getLoadManagerExecutor() + .scheduleAtFixedRate(() -> { + try { + // TODO: consider excluding the bundles that are in the process of split. + topBundleLoadDataReporter.reportAsync(false); + } catch (Throwable e) { + log.error("Failed to run the top bundles load manager executor job.", e); + } + }, + interval, + interval, TimeUnit.MILLISECONDS); + + this.monitorTask = this.pulsar.getLoadManagerExecutor() + .scheduleAtFixedRate(() -> { + monitor(); + }, + MONITOR_INTERVAL_IN_MILLIS, + MONITOR_INTERVAL_IN_MILLIS, TimeUnit.MILLISECONDS); + + this.splitScheduler.start(); + this.initWaiter.complete(null); + this.started = true; + log.info("Started load manager."); + } catch (Exception ex) { + failStarting(ex); + } + }); } catch (Exception ex) { - log.error("Failed to start the extensible load balance and close broker registry {}.", - this.brokerRegistry, ex); - if (this.brokerRegistry != null) { + failStarting(ex); + } + } + + private void failStarting(Exception ex) { + log.error("Failed to start the extensible load balance and close broker registry {}.", + this.brokerRegistry, ex); + if (this.brokerRegistry != null) { + try { brokerRegistry.close(); + } catch (PulsarServerException e) { + // ignore } } + initWaiter.completeExceptionally(ex); } @Override @@ -816,7 +841,7 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager, BrokerS boolean becameFollower = false; while (!Thread.currentThread().isInterrupted()) { try { - initWaiter.await(); + initWaiter.get(); if (!serviceUnitStateChannel.isChannelOwner()) { becameFollower = true; break; @@ -866,7 +891,7 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager, BrokerS boolean becameLeader = false; while (!Thread.currentThread().isInterrupted()) { try { - initWaiter.await(); + initWaiter.get(); if (serviceUnitStateChannel.isChannelOwner()) { becameLeader = true; break; @@ -936,7 +961,7 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager, BrokerS @VisibleForTesting protected void monitor() { try { - initWaiter.await(); + initWaiter.get(); // Monitor role // Periodically check the role in case ZK watcher fails. diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index df6a141ddcf..dfd03dfbc6e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -1335,7 +1335,9 @@ public class NamespaceService implements AutoCloseable { bundleOwnershipListeners.add(listener); } } - getOwnedServiceUnits().forEach(bundle -> notifyNamespaceBundleOwnershipListener(bundle, listeners)); + pulsar.runWhenReadyForIncomingRequests(() -> { + getOwnedServiceUnits().forEach(bundle -> notifyNamespaceBundleOwnershipListener(bundle, listeners)); + }); } public void addNamespaceBundleSplitListener(NamespaceBundleSplitListener... listeners) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java index 5308b3c981e..e276ea24fed 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java @@ -19,6 +19,7 @@ package org.apache.pulsar.broker.service; import com.google.common.annotations.VisibleForTesting; +import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelInitializer; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; @@ -104,6 +105,9 @@ public class PulsarChannelInitializer extends ChannelInitializer<SocketChannel> @Override protected void initChannel(SocketChannel ch) throws Exception { + // disable auto read explicitly so that requests aren't served until auto read is enabled + // ServerCnx must enable auto read in channelActive after PulsarService is ready to accept incoming requests + ch.config().setAutoRead(false); ch.pipeline().addLast("consolidation", new FlushConsolidationHandler(1024, true)); if (this.enableTls) { if (this.tlsEnabledWithKeyStore) { @@ -128,7 +132,8 @@ public class PulsarChannelInitializer extends ChannelInitializer<SocketChannel> // ServerCnx ends up reading higher number of messages and broker can not throttle the messages by disabling // auto-read. ch.pipeline().addLast("flowController", new FlowControlHandler()); - ServerCnx cnx = newServerCnx(pulsar, listenerName); + // using "ChannelHandler" type to workaround an IntelliJ bug that shows a false positive error + ChannelHandler cnx = newServerCnx(pulsar, listenerName); ch.pipeline().addLast("handler", cnx); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index b184f794949..4933aee974d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -369,6 +369,10 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { this.commandSender = new PulsarCommandSenderImpl(brokerInterceptor, this); this.service.getPulsarStats().recordConnectionCreate(); cnxsPerThread.get().add(this); + service.getPulsar().runWhenReadyForIncomingRequests(() -> { + // enable auto read after PulsarService is ready to accept incoming requests + ctx.channel().config().setAutoRead(true); + }); } @Override diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java index bf484d4f41f..c969f40ad43 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java @@ -20,12 +20,21 @@ package org.apache.pulsar.broker.web; import io.prometheus.client.CollectorRegistry; import io.prometheus.client.jetty.JettyStatisticsCollector; +import java.io.IOException; import java.util.ArrayList; import java.util.EnumSet; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.ExecutionException; import javax.servlet.DispatcherType; +import javax.servlet.Filter; +import javax.servlet.FilterChain; +import javax.servlet.FilterConfig; +import javax.servlet.ServletException; +import javax.servlet.ServletRequest; +import javax.servlet.ServletResponse; +import javax.servlet.http.HttpServletResponse; import lombok.Getter; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.PulsarService; @@ -232,6 +241,7 @@ public class WebService implements AutoCloseable { private final FilterHolder authenticationFilterHolder; FilterInitializer(PulsarService pulsarService) { ServiceConfiguration config = pulsarService.getConfiguration(); + if (config.getMaxConcurrentHttpRequests() > 0) { FilterHolder filterHolder = new FilterHolder(QoSFilter.class); filterHolder.setInitParameter("maxRequests", String.valueOf(config.getMaxConcurrentHttpRequests())); @@ -243,6 +253,10 @@ public class WebService implements AutoCloseable { new RateLimitingFilter(config.getHttpRequestsMaxPerSecond()))); } + // wait until the PulsarService is ready to serve incoming requests + filterHolders.add( + new FilterHolder(new WaitUntilPulsarServiceIsReadyForIncomingRequestsFilter(pulsarService))); + boolean brokerInterceptorEnabled = pulsarService.getBrokerInterceptor() != null; if (brokerInterceptorEnabled) { ExceptionHandler handler = new ExceptionHandler(); @@ -284,6 +298,42 @@ public class WebService implements AutoCloseable { } } + // Filter that waits until the PulsarService is ready to serve incoming requests + private static class WaitUntilPulsarServiceIsReadyForIncomingRequestsFilter implements Filter { + private final PulsarService pulsarService; + + public WaitUntilPulsarServiceIsReadyForIncomingRequestsFilter(PulsarService pulsarService) { + this.pulsarService = pulsarService; + } + + @Override + public void init(FilterConfig filterConfig) throws ServletException { + + } + + @Override + public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) + throws IOException, ServletException { + try { + // Wait until the PulsarService is ready to serve incoming requests + pulsarService.waitUntilReadyForIncomingRequests(); + } catch (ExecutionException e) { + ((HttpServletResponse) response).sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE, + "PulsarService failed to start."); + return; + } catch (InterruptedException e) { + ((HttpServletResponse) response).sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE, + "PulsarService is not ready."); + return; + } + chain.doFilter(request, response); + } + + @Override + public void destroy() { + + } + } } public void addServlet(String path, ServletHolder servletHolder, boolean requiresAuthentication,