This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 29af3ae6d4c2f12f7b61eb58ac4a88501e305be9
Author: Yan Zhao <[email protected]>
AuthorDate: Wed Aug 24 09:51:21 2022 +0800

    [fix][broker] Fix pulsarLedgerIdGenerator can't delete index path when zk 
metadata store config rootPath. (#17192)
---
 .../bookkeeper/PulsarLedgerIdGenerator.java        |  17 +++-
 .../pulsar/metadata/impl/ZKMetadataStore.java      |   9 ++
 .../bookkeeper/PulsarLedgerIdGeneratorTest.java    | 112 +++++++++++++++++++--
 3 files changed, 130 insertions(+), 8 deletions(-)

diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerIdGenerator.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerIdGenerator.java
index 29a65b84d88..0e1703cf343 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerIdGenerator.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerIdGenerator.java
@@ -31,6 +31,7 @@ import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.metadata.api.extended.CreateOption;
 import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+import org.apache.pulsar.metadata.impl.ZKMetadataStore;
 
 @Slf4j
 public class PulsarLedgerIdGenerator implements LedgerIdGenerator {
@@ -104,7 +105,7 @@ public class PulsarLedgerIdGenerator implements 
LedgerIdGenerator {
                 EnumSet.of(CreateOption.Ephemeral, CreateOption.Sequential))
                 .thenCompose(stat -> {
                     // delete the znode for id generation
-                    store.delete(stat.getPath(), Optional.empty()).
+                    store.delete(handleTheDeletePath(stat.getPath()), 
Optional.empty()).
                             exceptionally(ex -> {
                                 log.warn("Exception during deleting node for 
id generation: ", ex);
                                 return null;
@@ -235,7 +236,7 @@ public class PulsarLedgerIdGenerator implements 
LedgerIdGenerator {
                 .put(prefix, new byte[0], Optional.of(-1L), 
EnumSet.of(CreateOption.Ephemeral, CreateOption.Sequential))
                 .thenCompose(stat -> {
                     // delete the znode for id generation
-                    store.delete(stat.getPath(), Optional.empty()).
+                    store.delete(handleTheDeletePath(stat.getPath()), 
Optional.empty()).
                             exceptionally(ex -> {
                                 log.warn("Exception during deleting node for 
id generation: ", ex);
                                 return null;
@@ -287,4 +288,16 @@ public class PulsarLedgerIdGenerator implements 
LedgerIdGenerator {
         return ledgerIdGenPath + "/" + "ID-";
     }
 
+    //If the config rootPath when use zk metadata store, it will append 
rootPath as the prefix of the path.
+    //So when we get the path from the stat, we should truncate the rootPath.
+    private String handleTheDeletePath(String path) {
+        if (store instanceof ZKMetadataStore) {
+            String rootPath = ((ZKMetadataStore) store).getRootPath();
+            if (rootPath == null) {
+                return path;
+            }
+            return path.replaceFirst(rootPath, "");
+        }
+        return path;
+    }
 }
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
index 87b6be283fc..ad23faea25e 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
@@ -63,6 +63,7 @@ import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.client.ConnectStringParser;
 
 @Slf4j
 public class ZKMetadataStore extends AbstractBatchedMetadataStore
@@ -71,6 +72,7 @@ public class ZKMetadataStore extends 
AbstractBatchedMetadataStore
     public static final String ZK_SCHEME_IDENTIFIER = "zk:";
 
     private final String zkConnectString;
+    private final String rootPath;
     private final MetadataStoreConfig metadataStoreConfig;
     private final boolean isZkManaged;
     private final ZooKeeper zkc;
@@ -87,6 +89,8 @@ public class ZKMetadataStore extends 
AbstractBatchedMetadataStore
                 this.zkConnectString = metadataURL;
             }
             this.metadataStoreConfig = metadataStoreConfig;
+            this.rootPath = new 
ConnectStringParser(zkConnectString).getChrootPath();
+
             isZkManaged = true;
             zkc = 
PulsarZooKeeperClient.newBuilder().connectString(zkConnectString)
                     .connectRetryPolicy(new 
BoundExponentialBackoffRetryPolicy(100, 60_000, Integer.MAX_VALUE))
@@ -121,6 +125,7 @@ public class ZKMetadataStore extends 
AbstractBatchedMetadataStore
         super(config);
 
         this.zkConnectString = null;
+        this.rootPath = null;
         this.metadataStoreConfig = null;
         this.isZkManaged = false;
         this.zkc = zkc;
@@ -580,4 +585,8 @@ public class ZKMetadataStore extends 
AbstractBatchedMetadataStore
             throw KeeperException.create(Code.get(rc.get()));
         }
     }
+
+    public String getRootPath() {
+        return rootPath;
+    }
 }
diff --git 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerIdGeneratorTest.java
 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerIdGeneratorTest.java
index 2238bcf6160..c67daec93bb 100644
--- 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerIdGeneratorTest.java
+++ 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerIdGeneratorTest.java
@@ -62,7 +62,8 @@ public class PulsarLedgerIdGeneratorTest extends 
BaseMetadataStoreTest {
         CountDownLatch countDownLatch1 = new CountDownLatch(nThread * 
nLedgers);
 
         final AtomicInteger errCount = new AtomicInteger(0);
-        final ConcurrentLinkedQueue<Long> ledgerIds = new 
ConcurrentLinkedQueue<Long>();
+        final ConcurrentLinkedQueue<Long> shortLedgerIds = new 
ConcurrentLinkedQueue<Long>();
+        final ConcurrentLinkedQueue<Long> longLedgerIds = new 
ConcurrentLinkedQueue<Long>();
 
         long start = System.currentTimeMillis();
 
@@ -74,7 +75,7 @@ public class PulsarLedgerIdGeneratorTest extends 
BaseMetadataStoreTest {
                 for (int j = 0; j < nLedgers; j++) {
                     ledgerIdGenerator.generateLedgerId((rc, result) -> {
                         if (KeeperException.Code.OK.intValue() == rc) {
-                            ledgerIds.add(result);
+                            shortLedgerIds.add(result);
                         } else {
                             errCount.incrementAndGet();
                         }
@@ -96,7 +97,7 @@ public class PulsarLedgerIdGeneratorTest extends 
BaseMetadataStoreTest {
                 for (int j = 0; j < nLedgers; j++) {
                     ledgerIdGenerator.generateLedgerId((rc, result) -> {
                         if (KeeperException.Code.OK.intValue() == rc) {
-                            ledgerIds.add(result);
+                            longLedgerIds.add(result);
                         } else {
                             errCount.incrementAndGet();
                         }
@@ -108,19 +109,118 @@ public class PulsarLedgerIdGeneratorTest extends 
BaseMetadataStoreTest {
 
         assertTrue(countDownLatch2.await(120, TimeUnit.SECONDS),
                 "Wait ledger id generation threads to stop timeout : ");
-        log.info("Number of generated ledger id: {}, time used: {}", 
ledgerIds.size(),
+        log.info("Number of generated ledger id: {}, time used: {}", 
shortLedgerIds.size() + longLedgerIds.size(),
                 System.currentTimeMillis() - start);
         assertEquals(errCount.get(), 0, "Error occur during ledger id 
generation : ");
 
         Set<Long> ledgers = new HashSet<>();
-        while (!ledgerIds.isEmpty()) {
-            Long ledger = ledgerIds.poll();
+        while (!shortLedgerIds.isEmpty()) {
+            Long ledger = shortLedgerIds.poll();
+            assertNotNull(ledger, "Generated ledger id is null");
+            assertFalse(ledgers.contains(ledger), "Ledger id [" + ledger + "] 
conflict : ");
+            ledgers.add(ledger);
+        }
+        while (!longLedgerIds.isEmpty()) {
+            Long ledger = longLedgerIds.poll();
+            assertNotNull(ledger, "Generated ledger id is null");
+            assertFalse(ledgers.contains(ledger), "Ledger id [" + ledger + "] 
conflict : ");
+            ledgers.add(ledger);
+        }
+    }
+
+    @Test
+    public void testGenerateLedgerIdWithZkPrefix() throws Exception {
+        @Cleanup
+        MetadataStoreExtended store =
+                MetadataStoreExtended.create(zks.getConnectionString() + 
"/test", MetadataStoreConfig.builder().build());
+
+        @Cleanup
+        PulsarLedgerIdGenerator ledgerIdGenerator = new 
PulsarLedgerIdGenerator(store, "/ledgers");
+        // Create *nThread* threads each generate *nLedgers* ledger id,
+        // and then check there is no identical ledger id.
+        final int nThread = 2;
+        final int nLedgers = 2000;
+        // Multiply by two. We're going to do half in the old legacy space and 
half in the new.
+        CountDownLatch countDownLatch1 = new CountDownLatch(nThread * 
nLedgers);
+
+        final AtomicInteger errCount = new AtomicInteger(0);
+        final ConcurrentLinkedQueue<Long> shortLedgerIds = new 
ConcurrentLinkedQueue<Long>();
+        final ConcurrentLinkedQueue<Long> longLedgerIds = new 
ConcurrentLinkedQueue<Long>();
+
+        long start = System.currentTimeMillis();
+
+        @Cleanup(value = "shutdownNow")
+        ExecutorService executor = Executors.newCachedThreadPool();
+
+        for (int i = 0; i < nThread; i++) {
+            executor.submit(() -> {
+                for (int j = 0; j < nLedgers; j++) {
+                    ledgerIdGenerator.generateLedgerId((rc, result) -> {
+                        if (KeeperException.Code.OK.intValue() == rc) {
+                            shortLedgerIds.add(result);
+                        } else {
+                            errCount.incrementAndGet();
+                        }
+                        countDownLatch1.countDown();
+                    });
+                }
+            });
+        }
+
+        countDownLatch1.await();
+        for (Long ledgerId : shortLedgerIds) {
+            assertFalse(store.exists("/ledgers/idgen/ID-" + 
String.format("%010d", ledgerId)).get(),
+                    "Exception during deleting node for id generation : ");
+        }
+        CountDownLatch countDownLatch2 = new CountDownLatch(nThread * 
nLedgers);
+
+        // Go and create the long-id directory in zookeeper. This should cause 
the id generator to generate ids with the
+        // new algo once we clear it's stored status.
+        store.put("/ledgers/idgen-long", new byte[0], Optional.empty()).join();
+
+        for (int i = 0; i < nThread; i++) {
+            executor.submit(() -> {
+                for (int j = 0; j < nLedgers; j++) {
+                    ledgerIdGenerator.generateLedgerId((rc, result) -> {
+                        if (KeeperException.Code.OK.intValue() == rc) {
+                            longLedgerIds.add(result);
+                        } else {
+                            errCount.incrementAndGet();
+                        }
+                        countDownLatch2.countDown();
+                    });
+                }
+            });
+        }
+
+        assertTrue(countDownLatch2.await(120, TimeUnit.SECONDS),
+                "Wait ledger id generation threads to stop timeout : ");
+        ///test/ledgers/idgen-long/HOB-0000000001/ID-0000000000
+        for (Long ledgerId : longLedgerIds) {
+            assertFalse(store.exists("/ledgers/idgen-long/HOB-0000000001/ID-" 
+ String.format("%010d", ledgerId >> 32)).get(),
+                    "Exception during deleting node for id generation : ");
+        }
+
+        log.info("Number of generated ledger id: {}, time used: {}", 
shortLedgerIds.size() + longLedgerIds.size(),
+                System.currentTimeMillis() - start);
+        assertEquals(errCount.get(), 0, "Error occur during ledger id 
generation : ");
+
+        Set<Long> ledgers = new HashSet<>();
+        while (!shortLedgerIds.isEmpty()) {
+            Long ledger = shortLedgerIds.poll();
+            assertNotNull(ledger, "Generated ledger id is null");
+            assertFalse(ledgers.contains(ledger), "Ledger id [" + ledger + "] 
conflict : ");
+            ledgers.add(ledger);
+        }
+        while (!longLedgerIds.isEmpty()) {
+            Long ledger = longLedgerIds.poll();
             assertNotNull(ledger, "Generated ledger id is null");
             assertFalse(ledgers.contains(ledger), "Ledger id [" + ledger + "] 
conflict : ");
             ledgers.add(ledger);
         }
     }
 
+
     @Test(dataProvider = "impl")
     public void testEnsureCounterIsNotResetWithContainerNodes(String provider, 
Supplier<String> urlSupplier)
             throws Exception {

Reply via email to