This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.7 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 26c2837cb9e374942494e7f32f2d4d8f6df480f6 Author: lipenghui <peng...@apache.org> AuthorDate: Tue Apr 27 23:57:34 2021 +0800 Fix schema ledger deletion when deleting topic with delete schema. (#10383) * Fix schema ledger deletion when delete topic with delete schema. * Revert public * Apply comments. * Apply comment. * Fix checkstyle. * Fix test (cherry picked from commit a22782490bb9a17411b749326d1f084b096998c8) --- .../service/schema/BookkeeperSchemaStorage.java | 77 ++++++++++---------- .../java/org/apache/pulsar/schema/SchemaTest.java | 83 +++++++++++++++++++--- 2 files changed, 114 insertions(+), 46 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java index 2ecc927..521db90 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java @@ -41,7 +41,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; -import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import javax.validation.constraints.NotNull; @@ -83,10 +82,8 @@ public class BookkeeperSchemaStorage implements SchemaStorage { private final ServiceConfiguration config; private BookKeeper bookKeeper; - // schemaId => ledgers of the schemaId - private final Map<String, List<Long>> schemaLedgers = new ConcurrentHashMap<>(); - - private final ConcurrentMap<String, CompletableFuture<StoredSchema>> readSchemaOperations = new ConcurrentHashMap<>(); + private final ConcurrentMap<String, CompletableFuture<StoredSchema>> readSchemaOperations = + new ConcurrentHashMap<>(); @VisibleForTesting BookkeeperSchemaStorage(PulsarService pulsar) { @@ -160,7 +157,7 @@ public class BookkeeperSchemaStorage implements SchemaStorage { return result; } - private CompletableFuture<Optional<LocatorEntry>> getLocator(String key) { + CompletableFuture<Optional<LocatorEntry>> getLocator(String key) { return getSchemaLocator(getSchemaPath(key)); } @@ -168,8 +165,7 @@ public class BookkeeperSchemaStorage implements SchemaStorage { localZkCache.invalidate(getSchemaPath(key)); } - @VisibleForTesting - List<Long> getSchemaLedgerList(String key) throws IOException { + public List<Long> getSchemaLedgerList(String key) throws IOException { Optional<LocatorEntry> locatorEntry = null; try { locatorEntry = getLocator(key).get(); @@ -390,33 +386,44 @@ public class BookkeeperSchemaStorage implements SchemaStorage { } else { // The version is only for the compatibility of the current interface final long version = -1; - final List<Long> ledgerIds = schemaLedgers.get(schemaId); - if (ledgerIds != null) { - CompletableFuture<Long> future = new CompletableFuture<>(); - final AtomicInteger numOfLedgerIds = new AtomicInteger(ledgerIds.size()); - for (long ledgerId : ledgerIds) { - bookKeeper.asyncDeleteLedger(ledgerId, (int rc, Object cnx) -> { - if (rc != BKException.Code.OK) { - // It's not a serious error, we didn't need call future.completeExceptionally() - log.warn("Failed to delete ledger {} of {}: {}", ledgerId, schemaId, rc); - } - if (numOfLedgerIds.decrementAndGet() == 0) { - try { - ZkUtils.deleteFullPathOptimistic(zooKeeper, getSchemaPath(schemaId), -1); - } catch (InterruptedException | KeeperException e) { - future.completeExceptionally(e); + CompletableFuture<Long> future = new CompletableFuture<>(); + getLocator(schemaId).whenComplete((locator, ex) -> { + if (ex != null) { + future.completeExceptionally(ex); + } else { + if (!locator.isPresent()) { + future.complete(null); + return; + } + List<SchemaStorageFormat.IndexEntry> indexEntryList = locator.get().locator.getIndexList(); + List<CompletableFuture<Void>> deleteFutures = new ArrayList<>(indexEntryList.size()); + indexEntryList.forEach(indexEntry -> { + final long ledgerId = indexEntry.getPosition().getLedgerId(); + CompletableFuture<Void> deleteFuture = new CompletableFuture<>(); + deleteFutures.add(deleteFuture); + bookKeeper.asyncDeleteLedger(ledgerId, (int rc, Object cnx) -> { + if (rc != BKException.Code.OK) { + // It's not a serious error, we didn't need call future.completeExceptionally() + log.warn("Failed to delete ledger {} of {}: {}", ledgerId, schemaId, rc); } - clearLocatorCache(getSchemaPath(schemaId)); - future.complete(version); - } - }, null); + deleteFuture.complete(null); + }, null); + }); + FutureUtil.waitForAll(deleteFutures).whenComplete((v, e) -> { + final String path = getSchemaPath(schemaId); + ZkUtils.asyncDeleteFullPathOptimistic(zooKeeper, path, -1, (rc, path1, ctx) -> { + if (rc != Code.OK.intValue()) { + future.completeExceptionally(KeeperException.create(Code.get(rc))); + } else { + clearLocatorCache(getSchemaPath(schemaId)); + future.complete(version); + } + }, path); + + }); } - return future; - } else { - // It should never reach here - log.warn("No ledgers for schema id: {}", schemaId); - return completedFuture(version); - } + }); + return future; } }); } @@ -578,10 +585,6 @@ public class BookkeeperSchemaStorage implements SchemaStorage { if (rc != BKException.Code.OK) { future.completeExceptionally(bkException("Failed to create ledger", rc, -1, -1)); } else { - schemaLedgers.computeIfAbsent( - schemaId, - key -> Collections.synchronizedList(new ArrayList<>()) - ).add(handle.getId()); future.complete(handle); } }, null, metadata); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java index 595da2b..d72cbf1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java @@ -20,13 +20,23 @@ package org.apache.pulsar.schema; import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT; import static org.apache.pulsar.schema.compatibility.SchemaCompatibilityCheckTest.randomName; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; +import com.google.common.collect.Sets; import java.util.Collections; - +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; +import lombok.Cleanup; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.client.BKException; +import org.apache.bookkeeper.client.BookKeeper; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage; +import org.apache.pulsar.broker.service.schema.SchemaRegistry; import org.apache.pulsar.broker.service.schema.SchemaRegistryServiceImpl; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; @@ -39,14 +49,11 @@ import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.TenantInfo; import org.apache.pulsar.common.schema.SchemaType; +import org.apache.pulsar.common.util.FutureUtil; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; -import com.google.common.collect.Sets; - -import lombok.extern.slf4j.Slf4j; - @Slf4j public class SchemaTest extends MockedPulsarServiceBaseTest { @@ -206,4 +213,62 @@ public class SchemaTest extends MockedPulsarServiceBaseTest { } } } + + @Test + public void testDeleteTopicAndSchema() throws Exception { + final String tenant = PUBLIC_TENANT; + final String namespace = "test-namespace-" + randomName(16); + final String topicName = "test-delete-topic-and-schema"; + + final String topic = TopicName.get( + TopicDomain.persistent.value(), + tenant, + namespace, + topicName).toString(); + + admin.namespaces().createNamespace( + tenant + "/" + namespace, + Sets.newHashSet(CLUSTER_NAME)); + + @Cleanup + Producer<Schemas.PersonOne> p1 = pulsarClient.newProducer(Schema.JSON(Schemas.PersonOne.class)) + .topic(topic) + .create(); + + @Cleanup + Producer<Schemas.PersonThree> p2 = pulsarClient.newProducer(Schema.JSON(Schemas.PersonThree.class)) + .topic(topic) + .create(); + + List<CompletableFuture<SchemaRegistry.SchemaAndMetadata>> schemaFutures = + this.pulsar.getSchemaRegistryService().getAllSchemas(TopicName.get(topic).getSchemaName()).get(); + FutureUtil.waitForAll(schemaFutures).get(); + List<SchemaRegistry.SchemaAndMetadata> schemas = schemaFutures.stream().map(future -> { + try { + return future.get(); + } catch (Exception e) { + return null; + } + }).collect(Collectors.toList()); + + assertEquals(schemas.size(), 2); + for (SchemaRegistry.SchemaAndMetadata schema : schemas) { + assertNotNull(schema); + } + + List<Long> ledgers = ((BookkeeperSchemaStorage)this.pulsar.getSchemaStorage()) + .getSchemaLedgerList(TopicName.get(topic).getSchemaName()); + assertEquals(ledgers.size(), 2); + admin.topics().delete(topic, true, true); + assertEquals(this.pulsar.getSchemaRegistryService() + .trimDeletedSchemaAndGetList(TopicName.get(topic).getSchemaName()).get().size(), 0); + + for (Long ledger : ledgers) { + try { + pulsar.getBookKeeperClient().openLedger(ledger, BookKeeper.DigestType.CRC32, new byte[]{}); + fail(); + } catch (BKException.BKNoSuchLedgerExistsException ignore) { + } + } + } }