This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 113418d Ensure standalone service comes back quickly after ungraceful restarts (#2487) 113418d is described below commit 113418d3e7b4d02597bed7a939a46e5f66829621 Author: Matteo Merli <mme...@apache.org> AuthorDate: Tue Sep 4 12:34:48 2018 -0700 Ensure standalone service comes back quickly after ungraceful restarts (#2487) * Ensure standalone service comes back quickly after ungraceful restarts * Handle NoNode errors when deleting * Added NoopLoadManager for standalone mode --- conf/standalone.conf | 2 + .../pulsar/broker/loadbalance/NoopLoadManager.java | 158 +++++++++++++++++++++ .../loadbalance/impl/ModularLoadManagerImpl.java | 3 +- .../pulsar/zookeeper/LocalBookkeeperEnsemble.java | 15 +- 4 files changed, 176 insertions(+), 2 deletions(-) diff --git a/conf/standalone.conf b/conf/standalone.conf index 74a5702..755b76e 100644 --- a/conf/standalone.conf +++ b/conf/standalone.conf @@ -311,6 +311,8 @@ autoSkipNonRecoverableData=false ### --- Load balancer --- ### +loadManagerClassName=org.apache.pulsar.broker.loadbalance.NoopLoadManager + # Enable load balancer loadBalancerEnabled=false diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/NoopLoadManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/NoopLoadManager.java new file mode 100644 index 0000000..5773c61 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/NoopLoadManager.java @@ -0,0 +1,158 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.loadbalance; + +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.Set; + +import org.apache.bookkeeper.util.ZkUtils; +import org.apache.pulsar.broker.PulsarServerException; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.loadbalance.impl.PulsarResourceDescription; +import org.apache.pulsar.broker.loadbalance.impl.SimpleResourceUnit; +import org.apache.pulsar.common.naming.ServiceUnitId; +import org.apache.pulsar.common.stats.Metrics; +import org.apache.pulsar.common.util.ObjectMapperFactory; +import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport; +import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData; +import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData; +import org.apache.pulsar.zookeeper.ZooKeeperCache.Deserializer; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException.NoNodeException; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.ZooKeeper; + +public class NoopLoadManager implements LoadManager { + + private String lookupServiceAddress; + private ResourceUnit localResourceUnit; + private ZooKeeper zkClient; + + LocalBrokerData localData; + + private static final Deserializer<LocalBrokerData> loadReportDeserializer = (key, content) -> ObjectMapperFactory + .getThreadLocal() + .readValue(content, LocalBrokerData.class); + + @Override + public void initialize(PulsarService pulsar) { + lookupServiceAddress = pulsar.getAdvertisedAddress() + ":" + pulsar.getConfiguration().getWebServicePort(); + localResourceUnit = new SimpleResourceUnit(String.format("http://%s", lookupServiceAddress), + new PulsarResourceDescription()); + zkClient = pulsar.getZkClient(); + + localData = new LocalBrokerData(pulsar.getWebServiceAddress(), pulsar.getWebServiceAddressTls(), + pulsar.getBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls()); + } + + @Override + public void start() throws PulsarServerException { + String brokerZnodePath = LoadManager.LOADBALANCE_BROKERS_ROOT + "/" + lookupServiceAddress; + + try { + // When running in standalone, this error can happen when killing the "standalone" process + // ungracefully since the ZK session will not be closed and it will take some time for ZK server + // to prune the expired sessions after startup. + // Since there's a single broker instance running, it's safe, in this mode, to remove the old lock + + // Delete and recreate z-node + try { + if (zkClient.exists(brokerZnodePath, null) != null) { + zkClient.delete(brokerZnodePath, -1); + } + } catch (NoNodeException nne) { + // Ignore if z-node was just expired + } + + ZkUtils.createFullPathOptimistic(zkClient, brokerZnodePath, localData.getJsonBytes(), + ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); + + } catch (Exception e) { + throw new PulsarServerException(e); + } + } + + @Override + public boolean isCentralized() { + return false; + } + + @Override + public Optional<ResourceUnit> getLeastLoaded(ServiceUnitId su) throws Exception { + return Optional.of(localResourceUnit); + } + + @Override + public LoadManagerReport generateLoadReport() throws Exception { + return null; + } + + @Override + public Deserializer<? extends ServiceLookupData> getLoadReportDeserializer() { + return loadReportDeserializer; + } + + @Override + public void setLoadReportForceUpdateFlag() { + // do nothing + } + + @Override + public void writeLoadReportOnZookeeper() throws Exception { + // do nothing + } + + @Override + public void writeResourceQuotasToZooKeeper() throws Exception { + // do nothing + } + + @Override + public List<Metrics> getLoadBalancingMetrics() { + return Collections.emptyList(); + } + + @Override + public void doLoadShedding() { + // do nothing + } + + @Override + public void doNamespaceBundleSplit() throws Exception { + // do nothing + } + + @Override + public void disableBroker() throws Exception { + // do nothing + } + + @Override + public Set<String> getAvailableBrokers() throws Exception { + return Collections.singleton(lookupServiceAddress); + } + + @Override + public void stop() throws PulsarServerException { + // do nothing + } + +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java index 714389f..32742b6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java @@ -793,7 +793,8 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach if (ownerZkSessionId != 0 && ownerZkSessionId != zkClient.getSessionId()) { log.error("Broker znode - [{}] is own by different zookeeper-ssession {} ", brokerZnodePath, ownerZkSessionId); - throw new PulsarServerException("Broker-znode owned by different zk-session " + ownerZkSessionId); + throw new PulsarServerException( + "Broker-znode owned by different zk-session " + ownerZkSessionId); } // Node may already be created by another load manager: in this case update the data. zkClient.setData(brokerZnodePath, localData.getJsonBytes(), -1); diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java index 039a8a2..f6ab92c 100644 --- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java +++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java @@ -63,6 +63,7 @@ import org.apache.bookkeeper.util.MathUtils; import org.apache.commons.configuration.CompositeConfiguration; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.KeeperException.NoNodeException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.KeeperState; @@ -220,9 +221,21 @@ public class LocalBookkeeperEnsemble { cleanDirectory(bkDataDir); } + int bookiePort = initialPort + i; + + // Ensure registration Z-nodes are cleared when standalone service is restarted ungracefully + String registrationZnode = String.format("/ledgers/available/%s:%d", baseConf.getAdvertisedAddress(), bookiePort); + if (zkc.exists(registrationZnode, null) != null) { + try { + zkc.delete(registrationZnode, -1); + } catch (NoNodeException nne) { + // Ignore if z-node was just expired + } + } + bsConfs[i] = new ServerConfiguration(baseConf); // override settings - bsConfs[i].setBookiePort(initialPort + i); + bsConfs[i].setBookiePort(bookiePort); bsConfs[i].setZkServers("127.0.0.1:" + ZooKeeperDefaultPort); bsConfs[i].setJournalDirName(bkDataDir.getPath()); bsConfs[i].setLedgerDirNames(new String[] { bkDataDir.getPath() });