This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new bdc92c0ca62 [fix][broker] read local cookie when start pulsar 
standalone (#18260)
bdc92c0ca62 is described below

commit bdc92c0ca62cc4cac995a0e132d9308485773521
Author: labuladong <[email protected]>
AuthorDate: Tue Nov 1 12:45:24 2022 +0800

    [fix][broker] read local cookie when start pulsar standalone (#18260)
---
 .../java/org/apache/pulsar/PulsarStandalone.java   |  3 +-
 .../org/apache/pulsar/PulsarStandaloneTest.java    | 37 ++++++++-
 ...pulsar_broker_test_standalone_with_rocksdb.conf | 97 ++++++++++++++++++++++
 .../pulsar/metadata/bookkeeper/BKCluster.java      | 19 +++++
 4 files changed, 154 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java
index 1191b98f8c6..8c5f9e899b2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java
@@ -438,7 +438,8 @@ public class PulsarStandalone implements AutoCloseable {
         }
     }
 
-    private void startBookieWithMetadataStore() throws Exception {
+    @VisibleForTesting
+    void startBookieWithMetadataStore() throws Exception {
         if (StringUtils.isBlank(metadataStoreUrl)){
             log.info("Starting BK with RocksDb metadata store");
             metadataStoreUrl = "rocksdb://" + 
Paths.get(metadataDir).toAbsolutePath();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/PulsarStandaloneTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/PulsarStandaloneTest.java
index b7b62eccb51..f4c34bbc96c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/PulsarStandaloneTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/PulsarStandaloneTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar;
 
+import static org.apache.commons.io.FileUtils.cleanDirectory;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.eq;
@@ -25,7 +26,10 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
-
+import java.io.File;
+import java.util.List;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.util.IOUtils;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
@@ -33,6 +37,7 @@ import org.apache.pulsar.broker.resources.ClusterResources;
 import org.apache.pulsar.broker.resources.NamespaceResources;
 import org.apache.pulsar.broker.resources.PulsarResources;
 import org.apache.pulsar.broker.resources.TenantResources;
+import org.testng.Assert;
 import org.testng.annotations.Test;
 
 @Test(groups = "broker")
@@ -82,4 +87,34 @@ public class PulsarStandaloneTest {
         verify(nsr, times(1)).createPolicies(eq(ns), any());
     }
 
+    @Test(groups = "broker")
+    public void testStandaloneWithRocksDB() throws Exception {
+        String[] args = new String[]{"--config",
+                
"./src/test/resources/configurations/pulsar_broker_test_standalone_with_rocksdb.conf"};
+        final int bookieNum = 3;
+        final File tempDir = IOUtils.createTempDir("standalone", "test");
+
+        PulsarStandaloneStarter standalone = new PulsarStandaloneStarter(args);
+        standalone.setBkDir(tempDir.getAbsolutePath());
+        standalone.setNumOfBk(bookieNum);
+
+        standalone.startBookieWithMetadataStore();
+        List<ServerConfiguration> firstBsConfs = 
standalone.bkCluster.getBsConfs();
+        Assert.assertEquals(firstBsConfs.size(), bookieNum);
+        standalone.close();
+
+        // start twice, read cookie from local folder
+        standalone.startBookieWithMetadataStore();
+        List<ServerConfiguration> secondBsConfs = 
standalone.bkCluster.getBsConfs();
+        Assert.assertEquals(secondBsConfs.size(), bookieNum);
+
+        for (int i = 0; i < bookieNum; i++) {
+            ServerConfiguration conf1 = firstBsConfs.get(i);
+            ServerConfiguration conf2 = secondBsConfs.get(i);
+            Assert.assertEquals(conf1.getBookiePort(), conf2.getBookiePort());
+        }
+        standalone.close();
+        cleanDirectory(tempDir);
+    }
+
 }
diff --git 
a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test_standalone_with_rocksdb.conf
 
b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test_standalone_with_rocksdb.conf
new file mode 100644
index 00000000000..d8b26bbbfa9
--- /dev/null
+++ 
b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test_standalone_with_rocksdb.conf
@@ -0,0 +1,97 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+applicationName="pulsar_broker"
+metadataStoreUrl=
+configurationMetadataStoreUrl=
+brokerServicePort=6650
+brokerServicePortTls=6651
+webServicePort=8080
+allowLoopback=true
+webServicePortTls=4443
+bindAddress=0.0.0.0
+advertisedAddress=
+advertisedListeners=
+internalListenerName=internal
+clusterName="test_cluster"
+brokerShutdownTimeoutMs=3000
+backlogQuotaCheckEnabled=true
+backlogQuotaCheckIntervalInSeconds=60
+backlogQuotaDefaultLimitGB=50
+brokerDeleteInactiveTopicsEnabled=true
+brokerDeleteInactiveTopicsFrequencySeconds=60
+allowAutoTopicCreation=true
+allowAutoTopicCreationType=non-partitioned
+defaultNumPartitions=1
+messageExpiryCheckIntervalInMinutes=5
+clientLibraryVersionCheckEnabled=false
+clientLibraryVersionCheckAllowUnversioned=true
+statusFilePath=/tmp/status.html
+tlsEnabled=false
+tlsCertificateFilePath=/usr/local/conf/pulsar/server.crt
+tlsKeyFilePath=/home/local/conf/pulsar/server.key
+tlsTrustCertsFilePath=
+tlsAllowInsecureConnection=false
+authenticationEnabled=false
+authorizationEnabled=false
+superUserRoles="test_user"
+brokerClientAuthenticationPlugin="org.apache.pulsar.client.impl.auth.AuthenticationDisabled"
+brokerClientAuthenticationParameters=
+bookkeeperClientAuthenticationPlugin="test_auth_plugin"
+bookkeeperClientAuthenticationAppId="test_auth_id"
+bookkeeperClientTimeoutInSeconds=30
+bookkeeperClientSpeculativeReadTimeoutInMillis=0
+bookkeeperClientHealthCheckEnabled=true
+bookkeeperClientHealthCheckIntervalSeconds=60
+bookkeeperClientHealthCheckErrorThresholdPerInterval=5
+bookkeeperClientHealthCheckQuarantineTimeInSeconds=1800
+bookkeeperClientRackawarePolicyEnabled=true
+bookkeeperClientRegionawarePolicyEnabled=false
+bookkeeperClientMinNumRacksPerWriteQuorum=2
+bookkeeperClientEnforceMinNumRacksPerWriteQuorum=false
+bookkeeperClientReorderReadSequenceEnabled=false
+bookkeeperClientIsolationGroups="test_group"
+managedLedgerDefaultEnsembleSize=3
+managedLedgerDefaultWriteQuorum=2
+managedLedgerDefaultAckQuorum=2
+managedLedgerCacheSizeMB=1024
+managedLedgerCacheEvictionWatermark=10
+managedLedgerDefaultMarkDeleteRateLimit=0.1
+managedLedgerMaxEntriesPerLedger=50000
+managedLedgerMinLedgerRolloverTimeMinutes=10
+managedLedgerMaxLedgerRolloverTimeMinutes=240
+managedLedgerCursorMaxEntriesPerLedger=50000
+managedLedgerCursorRolloverTimeInSeconds = 14400
+managedLedgerDataReadPriority = bookkeeper-first
+loadBalancerEnabled = false
+loadBalancerReportUpdateThresholdPercentage=10
+loadBalancerReportUpdateMaxIntervalMinutes=15
+loadBalancerHostUsageCheckIntervalMinutes=1
+loadBalancerSheddingIntervalMinutes=30
+loadBalancerSheddingGracePeriodMinutes=30
+loadBalancerBrokerUnderloadedThresholdPercentage=50
+loadBalancerBrokerOverloadedThresholdPercentage=85
+replicationMetricsEnabled=true
+replicationConnectionsPerBroker=16
+replicationProducerQueueSize=1000
+replicatorPrefix=pulsar.repl
+brokerDeleteInactiveTopicsMode=delete_when_subscriptions_caught_up
+supportedNamespaceBundleSplitAlgorithms=[range_equally_divide]
+defaultNamespaceBundleSplitAlgorithm=topic_count_equally_divide
+maxMessagePublishBufferSizeInMB=-1
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BKCluster.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BKCluster.java
index 53825cae2b2..f1e259f0abc 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BKCluster.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BKCluster.java
@@ -30,9 +30,12 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.bookie.BookieImpl;
+import org.apache.bookkeeper.bookie.Cookie;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.common.allocator.PoolingPolicy;
 import org.apache.bookkeeper.common.component.ComponentStarter;
@@ -65,6 +68,7 @@ public class BKCluster implements AutoCloseable {
     // BookKeeper related variables
     private final List<File> tmpDirs = new ArrayList<>();
     private final List<LifecycleComponentStack> bookieComponents = new 
ArrayList<>();
+    @Getter
     private final List<ServerConfiguration> bsConfs = new ArrayList<>();
 
     protected final ServerConfiguration baseConf;
@@ -230,9 +234,24 @@ public class BKCluster implements AutoCloseable {
             // and 2nd bookie's cookie validation fails
             port = clusterConf.bkPort;
         }
+        File[] cookieDir = dataDir.listFiles((file) -> 
file.getName().equals("current"));
+        if (cookieDir != null && cookieDir.length > 0) {
+            String existBookieAddr = 
parseBookieAddressFromCookie(cookieDir[0]);
+            if (existBookieAddr != null) {
+                baseConf.setAdvertisedAddress(existBookieAddr.split(":")[0]);
+                port = Integer.parseInt(existBookieAddr.split(":")[1]);
+            }
+        }
         return newServerConfiguration(port, dataDir, new File[]{dataDir});
     }
 
+    private String parseBookieAddressFromCookie(File dir) throws IOException {
+        Cookie cookie = Cookie.readFromDirectory(dir);
+        Pattern pattern = Pattern.compile(".*bookieHost: \"(.*?)\".*", 
Pattern.DOTALL);
+        Matcher m = pattern.matcher(cookie.toString());
+        return m.find() ? m.group(1) : null;
+    }
+
     private ClientConfiguration newClientConfiguration() {
         return new ClientConfiguration(baseConf);
     }

Reply via email to