sijie closed pull request #2935: Ensure broker is fully boostrapped before load 
manager register itself
URL: https://github.com/apache/pulsar/pull/2935
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/VipStatus.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/VipStatus.java
index 5dfba9468c..c80f0a5471 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/VipStatus.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/VipStatus.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.common.configuration;
 
 import java.io.File;
+import java.util.function.Supplier;
 
 import javax.servlet.ServletContext;
 import javax.ws.rs.GET;
@@ -34,6 +35,7 @@
 public class VipStatus {
 
     public static final String ATTRIBUTE_STATUS_FILE_PATH = "statusFilePath";
+    public static final String ATTRIBUTE_IS_READY_PROBE = "isReadyProbe";
 
     @Context
     protected ServletContext servletContext;
@@ -41,11 +43,15 @@
     @GET
     @Context
     public String checkStatus() {
-
         String statusFilePath = (String) 
servletContext.getAttribute(ATTRIBUTE_STATUS_FILE_PATH);
+        @SuppressWarnings("unchecked")
+        Supplier<Boolean> isReadyProbe = (Supplier<Boolean>) 
servletContext.getAttribute(ATTRIBUTE_IS_READY_PROBE);
+
+        boolean isReady = isReadyProbe != null ? isReadyProbe.get() : true;
+
         if (statusFilePath != null) {
             File statusFile = new File(statusFilePath);
-            if (statusFile.exists() && statusFile.isFile()) {
+            if (isReady && statusFile.exists() && statusFile.isFile()) {
                 return "OK";
             }
         }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index d5c7105afc..8925c072d6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -172,7 +172,7 @@
         Init, Started, Closed
     }
 
-    private State state;
+    private volatile State state;
 
     private final ReentrantLock mutex = new ReentrantLock();
     private final Condition isClosedCondition = mutex.newCondition();
@@ -355,8 +355,6 @@ public void start() throws PulsarServerException {
             // Start load management service (even if load balancing is 
disabled)
             this.loadManager.set(LoadManager.create(this));
 
-            this.startLoadManagementService();
-
             // needs load management service
             this.startNamespaceService();
 
@@ -369,6 +367,13 @@ public void start() throws PulsarServerException {
             attributeMap.put(WebService.ATTRIBUTE_PULSAR_NAME, this);
             Map<String, Object> vipAttributeMap = Maps.newHashMap();
             vipAttributeMap.put(VipStatus.ATTRIBUTE_STATUS_FILE_PATH, 
this.config.getStatusFilePath());
+            vipAttributeMap.put(VipStatus.ATTRIBUTE_IS_READY_PROBE, new 
Supplier<Boolean>() {
+                @Override
+                public Boolean get() {
+                    // Ensure the VIP status is only visible when the broker 
is fully initialized
+                    return state == State.Started;
+                }
+            });
             this.webService.addRestResources("/", 
VipStatus.class.getPackage().getName(), false, vipAttributeMap);
             this.webService.addRestResources("/", 
"org.apache.pulsar.broker.web", false, attributeMap);
             this.webService.addRestResources("/admin", 
"org.apache.pulsar.broker.admin.v1", true, attributeMap);
@@ -446,11 +451,16 @@ public synchronized void brokerIsAFollowerNow() {
 
             leaderElectionService.start();
 
+            schemaRegistryService = SchemaRegistryService.create(this);
+
             webService.start();
 
             this.metricsGenerator = new MetricsGenerator(this);
 
-            schemaRegistryService = SchemaRegistryService.create(this);
+            // By starting the Load manager service, the broker will also 
become visible
+            // to the rest of the broker by creating the registration z-node. 
This needs
+            // to be done only when the broker is fully operative.
+            this.startLoadManagementService();
 
             state = State.Started;
 
@@ -810,12 +820,12 @@ public synchronized PulsarAdmin getAdminClient() throws 
PulsarServerException {
                         .authentication( //
                                 conf.getBrokerClientAuthenticationPlugin(), //
                                 
conf.getBrokerClientAuthenticationParameters());
-                
+
                 if (conf.isBrokerClientTlsEnabled()) {
                     
builder.tlsTrustCertsFilePath(conf.getBrokerClientTrustCertsFilePath());
                     
builder.allowTlsInsecureConnection(conf.isTlsAllowInsecureConnection());
                 }
-                
+
                 this.adminClient = builder.build();
                 LOG.info("Admin api url: " + adminApiUrl);
             } catch (Exception e) {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to