This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 29af3ae6d4c2f12f7b61eb58ac4a88501e305be9 Author: Yan Zhao <[email protected]> AuthorDate: Wed Aug 24 09:51:21 2022 +0800 [fix][broker] Fix pulsarLedgerIdGenerator can't delete index path when zk metadata store config rootPath. (#17192) --- .../bookkeeper/PulsarLedgerIdGenerator.java | 17 +++- .../pulsar/metadata/impl/ZKMetadataStore.java | 9 ++ .../bookkeeper/PulsarLedgerIdGeneratorTest.java | 112 +++++++++++++++++++-- 3 files changed, 130 insertions(+), 8 deletions(-) diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerIdGenerator.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerIdGenerator.java index 29a65b84d88..0e1703cf343 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerIdGenerator.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerIdGenerator.java @@ -31,6 +31,7 @@ import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.metadata.api.MetadataStoreException; import org.apache.pulsar.metadata.api.extended.CreateOption; import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; +import org.apache.pulsar.metadata.impl.ZKMetadataStore; @Slf4j public class PulsarLedgerIdGenerator implements LedgerIdGenerator { @@ -104,7 +105,7 @@ public class PulsarLedgerIdGenerator implements LedgerIdGenerator { EnumSet.of(CreateOption.Ephemeral, CreateOption.Sequential)) .thenCompose(stat -> { // delete the znode for id generation - store.delete(stat.getPath(), Optional.empty()). + store.delete(handleTheDeletePath(stat.getPath()), Optional.empty()). exceptionally(ex -> { log.warn("Exception during deleting node for id generation: ", ex); return null; @@ -235,7 +236,7 @@ public class PulsarLedgerIdGenerator implements LedgerIdGenerator { .put(prefix, new byte[0], Optional.of(-1L), EnumSet.of(CreateOption.Ephemeral, CreateOption.Sequential)) .thenCompose(stat -> { // delete the znode for id generation - store.delete(stat.getPath(), Optional.empty()). + store.delete(handleTheDeletePath(stat.getPath()), Optional.empty()). exceptionally(ex -> { log.warn("Exception during deleting node for id generation: ", ex); return null; @@ -287,4 +288,16 @@ public class PulsarLedgerIdGenerator implements LedgerIdGenerator { return ledgerIdGenPath + "/" + "ID-"; } + //If the config rootPath when use zk metadata store, it will append rootPath as the prefix of the path. + //So when we get the path from the stat, we should truncate the rootPath. + private String handleTheDeletePath(String path) { + if (store instanceof ZKMetadataStore) { + String rootPath = ((ZKMetadataStore) store).getRootPath(); + if (rootPath == null) { + return path; + } + return path.replaceFirst(rootPath, ""); + } + return path; + } } diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java index 87b6be283fc..ad23faea25e 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java @@ -63,6 +63,7 @@ import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.client.ConnectStringParser; @Slf4j public class ZKMetadataStore extends AbstractBatchedMetadataStore @@ -71,6 +72,7 @@ public class ZKMetadataStore extends AbstractBatchedMetadataStore public static final String ZK_SCHEME_IDENTIFIER = "zk:"; private final String zkConnectString; + private final String rootPath; private final MetadataStoreConfig metadataStoreConfig; private final boolean isZkManaged; private final ZooKeeper zkc; @@ -87,6 +89,8 @@ public class ZKMetadataStore extends AbstractBatchedMetadataStore this.zkConnectString = metadataURL; } this.metadataStoreConfig = metadataStoreConfig; + this.rootPath = new ConnectStringParser(zkConnectString).getChrootPath(); + isZkManaged = true; zkc = PulsarZooKeeperClient.newBuilder().connectString(zkConnectString) .connectRetryPolicy(new BoundExponentialBackoffRetryPolicy(100, 60_000, Integer.MAX_VALUE)) @@ -121,6 +125,7 @@ public class ZKMetadataStore extends AbstractBatchedMetadataStore super(config); this.zkConnectString = null; + this.rootPath = null; this.metadataStoreConfig = null; this.isZkManaged = false; this.zkc = zkc; @@ -580,4 +585,8 @@ public class ZKMetadataStore extends AbstractBatchedMetadataStore throw KeeperException.create(Code.get(rc.get())); } } + + public String getRootPath() { + return rootPath; + } } diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerIdGeneratorTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerIdGeneratorTest.java index 2238bcf6160..c67daec93bb 100644 --- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerIdGeneratorTest.java +++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerIdGeneratorTest.java @@ -62,7 +62,8 @@ public class PulsarLedgerIdGeneratorTest extends BaseMetadataStoreTest { CountDownLatch countDownLatch1 = new CountDownLatch(nThread * nLedgers); final AtomicInteger errCount = new AtomicInteger(0); - final ConcurrentLinkedQueue<Long> ledgerIds = new ConcurrentLinkedQueue<Long>(); + final ConcurrentLinkedQueue<Long> shortLedgerIds = new ConcurrentLinkedQueue<Long>(); + final ConcurrentLinkedQueue<Long> longLedgerIds = new ConcurrentLinkedQueue<Long>(); long start = System.currentTimeMillis(); @@ -74,7 +75,7 @@ public class PulsarLedgerIdGeneratorTest extends BaseMetadataStoreTest { for (int j = 0; j < nLedgers; j++) { ledgerIdGenerator.generateLedgerId((rc, result) -> { if (KeeperException.Code.OK.intValue() == rc) { - ledgerIds.add(result); + shortLedgerIds.add(result); } else { errCount.incrementAndGet(); } @@ -96,7 +97,7 @@ public class PulsarLedgerIdGeneratorTest extends BaseMetadataStoreTest { for (int j = 0; j < nLedgers; j++) { ledgerIdGenerator.generateLedgerId((rc, result) -> { if (KeeperException.Code.OK.intValue() == rc) { - ledgerIds.add(result); + longLedgerIds.add(result); } else { errCount.incrementAndGet(); } @@ -108,19 +109,118 @@ public class PulsarLedgerIdGeneratorTest extends BaseMetadataStoreTest { assertTrue(countDownLatch2.await(120, TimeUnit.SECONDS), "Wait ledger id generation threads to stop timeout : "); - log.info("Number of generated ledger id: {}, time used: {}", ledgerIds.size(), + log.info("Number of generated ledger id: {}, time used: {}", shortLedgerIds.size() + longLedgerIds.size(), System.currentTimeMillis() - start); assertEquals(errCount.get(), 0, "Error occur during ledger id generation : "); Set<Long> ledgers = new HashSet<>(); - while (!ledgerIds.isEmpty()) { - Long ledger = ledgerIds.poll(); + while (!shortLedgerIds.isEmpty()) { + Long ledger = shortLedgerIds.poll(); + assertNotNull(ledger, "Generated ledger id is null"); + assertFalse(ledgers.contains(ledger), "Ledger id [" + ledger + "] conflict : "); + ledgers.add(ledger); + } + while (!longLedgerIds.isEmpty()) { + Long ledger = longLedgerIds.poll(); + assertNotNull(ledger, "Generated ledger id is null"); + assertFalse(ledgers.contains(ledger), "Ledger id [" + ledger + "] conflict : "); + ledgers.add(ledger); + } + } + + @Test + public void testGenerateLedgerIdWithZkPrefix() throws Exception { + @Cleanup + MetadataStoreExtended store = + MetadataStoreExtended.create(zks.getConnectionString() + "/test", MetadataStoreConfig.builder().build()); + + @Cleanup + PulsarLedgerIdGenerator ledgerIdGenerator = new PulsarLedgerIdGenerator(store, "/ledgers"); + // Create *nThread* threads each generate *nLedgers* ledger id, + // and then check there is no identical ledger id. + final int nThread = 2; + final int nLedgers = 2000; + // Multiply by two. We're going to do half in the old legacy space and half in the new. + CountDownLatch countDownLatch1 = new CountDownLatch(nThread * nLedgers); + + final AtomicInteger errCount = new AtomicInteger(0); + final ConcurrentLinkedQueue<Long> shortLedgerIds = new ConcurrentLinkedQueue<Long>(); + final ConcurrentLinkedQueue<Long> longLedgerIds = new ConcurrentLinkedQueue<Long>(); + + long start = System.currentTimeMillis(); + + @Cleanup(value = "shutdownNow") + ExecutorService executor = Executors.newCachedThreadPool(); + + for (int i = 0; i < nThread; i++) { + executor.submit(() -> { + for (int j = 0; j < nLedgers; j++) { + ledgerIdGenerator.generateLedgerId((rc, result) -> { + if (KeeperException.Code.OK.intValue() == rc) { + shortLedgerIds.add(result); + } else { + errCount.incrementAndGet(); + } + countDownLatch1.countDown(); + }); + } + }); + } + + countDownLatch1.await(); + for (Long ledgerId : shortLedgerIds) { + assertFalse(store.exists("/ledgers/idgen/ID-" + String.format("%010d", ledgerId)).get(), + "Exception during deleting node for id generation : "); + } + CountDownLatch countDownLatch2 = new CountDownLatch(nThread * nLedgers); + + // Go and create the long-id directory in zookeeper. This should cause the id generator to generate ids with the + // new algo once we clear it's stored status. + store.put("/ledgers/idgen-long", new byte[0], Optional.empty()).join(); + + for (int i = 0; i < nThread; i++) { + executor.submit(() -> { + for (int j = 0; j < nLedgers; j++) { + ledgerIdGenerator.generateLedgerId((rc, result) -> { + if (KeeperException.Code.OK.intValue() == rc) { + longLedgerIds.add(result); + } else { + errCount.incrementAndGet(); + } + countDownLatch2.countDown(); + }); + } + }); + } + + assertTrue(countDownLatch2.await(120, TimeUnit.SECONDS), + "Wait ledger id generation threads to stop timeout : "); + ///test/ledgers/idgen-long/HOB-0000000001/ID-0000000000 + for (Long ledgerId : longLedgerIds) { + assertFalse(store.exists("/ledgers/idgen-long/HOB-0000000001/ID-" + String.format("%010d", ledgerId >> 32)).get(), + "Exception during deleting node for id generation : "); + } + + log.info("Number of generated ledger id: {}, time used: {}", shortLedgerIds.size() + longLedgerIds.size(), + System.currentTimeMillis() - start); + assertEquals(errCount.get(), 0, "Error occur during ledger id generation : "); + + Set<Long> ledgers = new HashSet<>(); + while (!shortLedgerIds.isEmpty()) { + Long ledger = shortLedgerIds.poll(); + assertNotNull(ledger, "Generated ledger id is null"); + assertFalse(ledgers.contains(ledger), "Ledger id [" + ledger + "] conflict : "); + ledgers.add(ledger); + } + while (!longLedgerIds.isEmpty()) { + Long ledger = longLedgerIds.poll(); assertNotNull(ledger, "Generated ledger id is null"); assertFalse(ledgers.contains(ledger), "Ledger id [" + ledger + "] conflict : "); ledgers.add(ledger); } } + @Test(dataProvider = "impl") public void testEnsureCounterIsNotResetWithContainerNodes(String provider, Supplier<String> urlSupplier) throws Exception {
