This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 7efce98 Ensure broker is fully boostrapped before load manager register itself (#2935) 7efce98 is described below commit 7efce9845d2b5847e18ae7f51cae887f9a4d0668 Author: Matteo Merli <mme...@apache.org> AuthorDate: Tue Nov 20 12:11:17 2018 -0800 Ensure broker is fully boostrapped before load manager register itself (#2935) ### Motivation In some cases the broker can immediately gets assigned traffic before it's fully boostrapped. This happens because the load manager is registering the broker in ZK before some of the initialization steps are completed. This results in NPE, like : ``` Caused by: java.lang.NullPointerException at org.apache.pulsar.broker.service.persistent.PersistentTopic.hasSchema(PersistentTopic.java:1815) ~[org.apache.pulsar-pulsar-broker-2.2.0-streamlio-22.jar:2.2.0-streamlio-22] at org.apache.pulsar.broker.service.ServerCnx.lambda$25(ServerCnx.java:836) ~[org.apache.pulsar-pulsar-broker-2.2.0-streamlio-22.jar:2.2.0-streamlio-22] at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:656) ~[?:1.8.0_181] ``` ### Modifications * Register the broker in ZK only after the full start sequence has been done. This will ensure other brokers will not discover this broker before it's ready. * Expose the "is ready" state in the VipStatus -- This will be used to make sure the load balancer will not direct any lookup request to the broker before it's ready. --- .../pulsar/common/configuration/VipStatus.java | 10 ++++++++-- .../org/apache/pulsar/broker/PulsarService.java | 22 ++++++++++++++++------ 2 files changed, 24 insertions(+), 8 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/VipStatus.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/VipStatus.java index 5dfba94..c80f0a5 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/VipStatus.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/VipStatus.java @@ -19,6 +19,7 @@ package org.apache.pulsar.common.configuration; import java.io.File; +import java.util.function.Supplier; import javax.servlet.ServletContext; import javax.ws.rs.GET; @@ -34,6 +35,7 @@ import javax.ws.rs.core.Response.Status; public class VipStatus { public static final String ATTRIBUTE_STATUS_FILE_PATH = "statusFilePath"; + public static final String ATTRIBUTE_IS_READY_PROBE = "isReadyProbe"; @Context protected ServletContext servletContext; @@ -41,11 +43,15 @@ public class VipStatus { @GET @Context public String checkStatus() { - String statusFilePath = (String) servletContext.getAttribute(ATTRIBUTE_STATUS_FILE_PATH); + @SuppressWarnings("unchecked") + Supplier<Boolean> isReadyProbe = (Supplier<Boolean>) servletContext.getAttribute(ATTRIBUTE_IS_READY_PROBE); + + boolean isReady = isReadyProbe != null ? isReadyProbe.get() : true; + if (statusFilePath != null) { File statusFile = new File(statusFilePath); - if (statusFile.exists() && statusFile.isFile()) { + if (isReady && statusFile.exists() && statusFile.isFile()) { return "OK"; } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index d5c7105..8925c07 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -172,7 +172,7 @@ public class PulsarService implements AutoCloseable { Init, Started, Closed } - private State state; + private volatile State state; private final ReentrantLock mutex = new ReentrantLock(); private final Condition isClosedCondition = mutex.newCondition(); @@ -355,8 +355,6 @@ public class PulsarService implements AutoCloseable { // Start load management service (even if load balancing is disabled) this.loadManager.set(LoadManager.create(this)); - this.startLoadManagementService(); - // needs load management service this.startNamespaceService(); @@ -369,6 +367,13 @@ public class PulsarService implements AutoCloseable { attributeMap.put(WebService.ATTRIBUTE_PULSAR_NAME, this); Map<String, Object> vipAttributeMap = Maps.newHashMap(); vipAttributeMap.put(VipStatus.ATTRIBUTE_STATUS_FILE_PATH, this.config.getStatusFilePath()); + vipAttributeMap.put(VipStatus.ATTRIBUTE_IS_READY_PROBE, new Supplier<Boolean>() { + @Override + public Boolean get() { + // Ensure the VIP status is only visible when the broker is fully initialized + return state == State.Started; + } + }); this.webService.addRestResources("/", VipStatus.class.getPackage().getName(), false, vipAttributeMap); this.webService.addRestResources("/", "org.apache.pulsar.broker.web", false, attributeMap); this.webService.addRestResources("/admin", "org.apache.pulsar.broker.admin.v1", true, attributeMap); @@ -446,11 +451,16 @@ public class PulsarService implements AutoCloseable { leaderElectionService.start(); + schemaRegistryService = SchemaRegistryService.create(this); + webService.start(); this.metricsGenerator = new MetricsGenerator(this); - schemaRegistryService = SchemaRegistryService.create(this); + // By starting the Load manager service, the broker will also become visible + // to the rest of the broker by creating the registration z-node. This needs + // to be done only when the broker is fully operative. + this.startLoadManagementService(); state = State.Started; @@ -810,12 +820,12 @@ public class PulsarService implements AutoCloseable { .authentication( // conf.getBrokerClientAuthenticationPlugin(), // conf.getBrokerClientAuthenticationParameters()); - + if (conf.isBrokerClientTlsEnabled()) { builder.tlsTrustCertsFilePath(conf.getBrokerClientTrustCertsFilePath()); builder.allowTlsInsecureConnection(conf.isTlsAllowInsecureConnection()); } - + this.adminClient = builder.build(); LOG.info("Admin api url: " + adminApiUrl); } catch (Exception e) {