This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 7efce98  Ensure broker is fully boostrapped before load manager 
register itself (#2935)
7efce98 is described below

commit 7efce9845d2b5847e18ae7f51cae887f9a4d0668
Author: Matteo Merli <mme...@apache.org>
AuthorDate: Tue Nov 20 12:11:17 2018 -0800

    Ensure broker is fully boostrapped before load manager register itself 
(#2935)
    
    ### Motivation
    
    In some cases the broker can immediately gets assigned traffic before it's 
fully boostrapped.
    
    This happens because the load manager is registering the broker in ZK 
before some of the initialization steps are completed.
    
    This results in NPE, like :
    
    ```
    Caused by: java.lang.NullPointerException
        at 
org.apache.pulsar.broker.service.persistent.PersistentTopic.hasSchema(PersistentTopic.java:1815)
 ~[org.apache.pulsar-pulsar-broker-2.2.0-streamlio-22.jar:2.2.0-streamlio-22]
        at 
org.apache.pulsar.broker.service.ServerCnx.lambda$25(ServerCnx.java:836) 
~[org.apache.pulsar-pulsar-broker-2.2.0-streamlio-22.jar:2.2.0-streamlio-22]
        at 
java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:656) 
~[?:1.8.0_181]
    ```
    
    ### Modifications
    
     * Register the broker in ZK only after the full start sequence has been 
done. This will ensure other brokers will not discover this broker before it's 
ready.
     * Expose the "is ready" state in the VipStatus -- This will be used to 
make sure the load balancer will not direct any lookup request to the broker 
before it's ready.
---
 .../pulsar/common/configuration/VipStatus.java     | 10 ++++++++--
 .../org/apache/pulsar/broker/PulsarService.java    | 22 ++++++++++++++++------
 2 files changed, 24 insertions(+), 8 deletions(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/VipStatus.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/VipStatus.java
index 5dfba94..c80f0a5 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/VipStatus.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/VipStatus.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.common.configuration;
 
 import java.io.File;
+import java.util.function.Supplier;
 
 import javax.servlet.ServletContext;
 import javax.ws.rs.GET;
@@ -34,6 +35,7 @@ import javax.ws.rs.core.Response.Status;
 public class VipStatus {
 
     public static final String ATTRIBUTE_STATUS_FILE_PATH = "statusFilePath";
+    public static final String ATTRIBUTE_IS_READY_PROBE = "isReadyProbe";
 
     @Context
     protected ServletContext servletContext;
@@ -41,11 +43,15 @@ public class VipStatus {
     @GET
     @Context
     public String checkStatus() {
-
         String statusFilePath = (String) 
servletContext.getAttribute(ATTRIBUTE_STATUS_FILE_PATH);
+        @SuppressWarnings("unchecked")
+        Supplier<Boolean> isReadyProbe = (Supplier<Boolean>) 
servletContext.getAttribute(ATTRIBUTE_IS_READY_PROBE);
+
+        boolean isReady = isReadyProbe != null ? isReadyProbe.get() : true;
+
         if (statusFilePath != null) {
             File statusFile = new File(statusFilePath);
-            if (statusFile.exists() && statusFile.isFile()) {
+            if (isReady && statusFile.exists() && statusFile.isFile()) {
                 return "OK";
             }
         }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index d5c7105..8925c07 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -172,7 +172,7 @@ public class PulsarService implements AutoCloseable {
         Init, Started, Closed
     }
 
-    private State state;
+    private volatile State state;
 
     private final ReentrantLock mutex = new ReentrantLock();
     private final Condition isClosedCondition = mutex.newCondition();
@@ -355,8 +355,6 @@ public class PulsarService implements AutoCloseable {
             // Start load management service (even if load balancing is 
disabled)
             this.loadManager.set(LoadManager.create(this));
 
-            this.startLoadManagementService();
-
             // needs load management service
             this.startNamespaceService();
 
@@ -369,6 +367,13 @@ public class PulsarService implements AutoCloseable {
             attributeMap.put(WebService.ATTRIBUTE_PULSAR_NAME, this);
             Map<String, Object> vipAttributeMap = Maps.newHashMap();
             vipAttributeMap.put(VipStatus.ATTRIBUTE_STATUS_FILE_PATH, 
this.config.getStatusFilePath());
+            vipAttributeMap.put(VipStatus.ATTRIBUTE_IS_READY_PROBE, new 
Supplier<Boolean>() {
+                @Override
+                public Boolean get() {
+                    // Ensure the VIP status is only visible when the broker 
is fully initialized
+                    return state == State.Started;
+                }
+            });
             this.webService.addRestResources("/", 
VipStatus.class.getPackage().getName(), false, vipAttributeMap);
             this.webService.addRestResources("/", 
"org.apache.pulsar.broker.web", false, attributeMap);
             this.webService.addRestResources("/admin", 
"org.apache.pulsar.broker.admin.v1", true, attributeMap);
@@ -446,11 +451,16 @@ public class PulsarService implements AutoCloseable {
 
             leaderElectionService.start();
 
+            schemaRegistryService = SchemaRegistryService.create(this);
+
             webService.start();
 
             this.metricsGenerator = new MetricsGenerator(this);
 
-            schemaRegistryService = SchemaRegistryService.create(this);
+            // By starting the Load manager service, the broker will also 
become visible
+            // to the rest of the broker by creating the registration z-node. 
This needs
+            // to be done only when the broker is fully operative.
+            this.startLoadManagementService();
 
             state = State.Started;
 
@@ -810,12 +820,12 @@ public class PulsarService implements AutoCloseable {
                         .authentication( //
                                 conf.getBrokerClientAuthenticationPlugin(), //
                                 
conf.getBrokerClientAuthenticationParameters());
-                
+
                 if (conf.isBrokerClientTlsEnabled()) {
                     
builder.tlsTrustCertsFilePath(conf.getBrokerClientTrustCertsFilePath());
                     
builder.allowTlsInsecureConnection(conf.isTlsAllowInsecureConnection());
                 }
-                
+
                 this.adminClient = builder.build();
                 LOG.info("Admin api url: " + adminApiUrl);
             } catch (Exception e) {

Reply via email to