sijie closed pull request #2935: Ensure broker is fully boostrapped before load manager register itself URL: https://github.com/apache/pulsar/pull/2935
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/VipStatus.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/VipStatus.java index 5dfba9468c..c80f0a5471 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/VipStatus.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/VipStatus.java @@ -19,6 +19,7 @@ package org.apache.pulsar.common.configuration; import java.io.File; +import java.util.function.Supplier; import javax.servlet.ServletContext; import javax.ws.rs.GET; @@ -34,6 +35,7 @@ public class VipStatus { public static final String ATTRIBUTE_STATUS_FILE_PATH = "statusFilePath"; + public static final String ATTRIBUTE_IS_READY_PROBE = "isReadyProbe"; @Context protected ServletContext servletContext; @@ -41,11 +43,15 @@ @GET @Context public String checkStatus() { - String statusFilePath = (String) servletContext.getAttribute(ATTRIBUTE_STATUS_FILE_PATH); + @SuppressWarnings("unchecked") + Supplier<Boolean> isReadyProbe = (Supplier<Boolean>) servletContext.getAttribute(ATTRIBUTE_IS_READY_PROBE); + + boolean isReady = isReadyProbe != null ? isReadyProbe.get() : true; + if (statusFilePath != null) { File statusFile = new File(statusFilePath); - if (statusFile.exists() && statusFile.isFile()) { + if (isReady && statusFile.exists() && statusFile.isFile()) { return "OK"; } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index d5c7105afc..8925c072d6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -172,7 +172,7 @@ Init, Started, Closed } - private State state; + private volatile State state; private final ReentrantLock mutex = new ReentrantLock(); private final Condition isClosedCondition = mutex.newCondition(); @@ -355,8 +355,6 @@ public void start() throws PulsarServerException { // Start load management service (even if load balancing is disabled) this.loadManager.set(LoadManager.create(this)); - this.startLoadManagementService(); - // needs load management service this.startNamespaceService(); @@ -369,6 +367,13 @@ public void start() throws PulsarServerException { attributeMap.put(WebService.ATTRIBUTE_PULSAR_NAME, this); Map<String, Object> vipAttributeMap = Maps.newHashMap(); vipAttributeMap.put(VipStatus.ATTRIBUTE_STATUS_FILE_PATH, this.config.getStatusFilePath()); + vipAttributeMap.put(VipStatus.ATTRIBUTE_IS_READY_PROBE, new Supplier<Boolean>() { + @Override + public Boolean get() { + // Ensure the VIP status is only visible when the broker is fully initialized + return state == State.Started; + } + }); this.webService.addRestResources("/", VipStatus.class.getPackage().getName(), false, vipAttributeMap); this.webService.addRestResources("/", "org.apache.pulsar.broker.web", false, attributeMap); this.webService.addRestResources("/admin", "org.apache.pulsar.broker.admin.v1", true, attributeMap); @@ -446,11 +451,16 @@ public synchronized void brokerIsAFollowerNow() { leaderElectionService.start(); + schemaRegistryService = SchemaRegistryService.create(this); + webService.start(); this.metricsGenerator = new MetricsGenerator(this); - schemaRegistryService = SchemaRegistryService.create(this); + // By starting the Load manager service, the broker will also become visible + // to the rest of the broker by creating the registration z-node. This needs + // to be done only when the broker is fully operative. + this.startLoadManagementService(); state = State.Started; @@ -810,12 +820,12 @@ public synchronized PulsarAdmin getAdminClient() throws PulsarServerException { .authentication( // conf.getBrokerClientAuthenticationPlugin(), // conf.getBrokerClientAuthenticationParameters()); - + if (conf.isBrokerClientTlsEnabled()) { builder.tlsTrustCertsFilePath(conf.getBrokerClientTrustCertsFilePath()); builder.allowTlsInsecureConnection(conf.isTlsAllowInsecureConnection()); } - + this.adminClient = builder.build(); LOG.info("Admin api url: " + adminApiUrl); } catch (Exception e) { ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services