This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 26c2837cb9e374942494e7f32f2d4d8f6df480f6
Author: lipenghui <peng...@apache.org>
AuthorDate: Tue Apr 27 23:57:34 2021 +0800

    Fix schema ledger deletion when deleting topic with delete schema. (#10383)
    
    * Fix schema ledger deletion when delete topic with delete schema.
    
    * Revert public
    
    * Apply comments.
    
    * Apply comment.
    
    * Fix checkstyle.
    
    * Fix test
    
    (cherry picked from commit a22782490bb9a17411b749326d1f084b096998c8)
---
 .../service/schema/BookkeeperSchemaStorage.java    | 77 ++++++++++----------
 .../java/org/apache/pulsar/schema/SchemaTest.java  | 83 +++++++++++++++++++---
 2 files changed, 114 insertions(+), 46 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
index 2ecc927..521db90 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
@@ -41,7 +41,6 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 import javax.validation.constraints.NotNull;
 
@@ -83,10 +82,8 @@ public class BookkeeperSchemaStorage implements 
SchemaStorage {
     private final ServiceConfiguration config;
     private BookKeeper bookKeeper;
 
-    // schemaId => ledgers of the schemaId
-    private final Map<String, List<Long>> schemaLedgers = new 
ConcurrentHashMap<>();
-
-    private final ConcurrentMap<String, CompletableFuture<StoredSchema>> 
readSchemaOperations = new ConcurrentHashMap<>();
+    private final ConcurrentMap<String, CompletableFuture<StoredSchema>> 
readSchemaOperations =
+            new ConcurrentHashMap<>();
 
     @VisibleForTesting
     BookkeeperSchemaStorage(PulsarService pulsar) {
@@ -160,7 +157,7 @@ public class BookkeeperSchemaStorage implements 
SchemaStorage {
         return result;
     }
 
-    private CompletableFuture<Optional<LocatorEntry>> getLocator(String key) {
+    CompletableFuture<Optional<LocatorEntry>> getLocator(String key) {
         return getSchemaLocator(getSchemaPath(key));
     }
 
@@ -168,8 +165,7 @@ public class BookkeeperSchemaStorage implements 
SchemaStorage {
         localZkCache.invalidate(getSchemaPath(key));
     }
 
-    @VisibleForTesting
-    List<Long> getSchemaLedgerList(String key) throws IOException {
+    public List<Long> getSchemaLedgerList(String key) throws IOException {
         Optional<LocatorEntry> locatorEntry = null;
         try {
             locatorEntry = getLocator(key).get();
@@ -390,33 +386,44 @@ public class BookkeeperSchemaStorage implements 
SchemaStorage {
             } else {
                 // The version is only for the compatibility of the current 
interface
                 final long version = -1;
-                final List<Long> ledgerIds = schemaLedgers.get(schemaId);
-                if (ledgerIds != null) {
-                    CompletableFuture<Long> future = new CompletableFuture<>();
-                    final AtomicInteger numOfLedgerIds = new 
AtomicInteger(ledgerIds.size());
-                    for (long ledgerId : ledgerIds) {
-                        bookKeeper.asyncDeleteLedger(ledgerId, (int rc, Object 
cnx) -> {
-                            if (rc != BKException.Code.OK) {
-                                // It's not a serious error, we didn't need 
call future.completeExceptionally()
-                                log.warn("Failed to delete ledger {} of {}: 
{}", ledgerId, schemaId, rc);
-                            }
-                            if (numOfLedgerIds.decrementAndGet() == 0) {
-                                try {
-                                    
ZkUtils.deleteFullPathOptimistic(zooKeeper, getSchemaPath(schemaId), -1);
-                                } catch (InterruptedException | 
KeeperException e) {
-                                    future.completeExceptionally(e);
+                CompletableFuture<Long> future = new CompletableFuture<>();
+                getLocator(schemaId).whenComplete((locator, ex) -> {
+                    if (ex != null) {
+                        future.completeExceptionally(ex);
+                    } else {
+                        if (!locator.isPresent()) {
+                            future.complete(null);
+                            return;
+                        }
+                        List<SchemaStorageFormat.IndexEntry> indexEntryList = 
locator.get().locator.getIndexList();
+                        List<CompletableFuture<Void>> deleteFutures = new 
ArrayList<>(indexEntryList.size());
+                        indexEntryList.forEach(indexEntry -> {
+                            final long ledgerId = 
indexEntry.getPosition().getLedgerId();
+                            CompletableFuture<Void> deleteFuture = new 
CompletableFuture<>();
+                            deleteFutures.add(deleteFuture);
+                            bookKeeper.asyncDeleteLedger(ledgerId, (int rc, 
Object cnx) -> {
+                                if (rc != BKException.Code.OK) {
+                                    // It's not a serious error, we didn't 
need call future.completeExceptionally()
+                                    log.warn("Failed to delete ledger {} of 
{}: {}", ledgerId, schemaId, rc);
                                 }
-                                clearLocatorCache(getSchemaPath(schemaId));
-                                future.complete(version);
-                            }
-                        }, null);
+                                deleteFuture.complete(null);
+                            }, null);
+                        });
+                        FutureUtil.waitForAll(deleteFutures).whenComplete((v, 
e) -> {
+                            final String path = getSchemaPath(schemaId);
+                            ZkUtils.asyncDeleteFullPathOptimistic(zooKeeper, 
path, -1, (rc, path1, ctx) -> {
+                                if (rc != Code.OK.intValue()) {
+                                    
future.completeExceptionally(KeeperException.create(Code.get(rc)));
+                                } else {
+                                    clearLocatorCache(getSchemaPath(schemaId));
+                                    future.complete(version);
+                                }
+                            }, path);
+
+                        });
                     }
-                    return future;
-                } else {
-                    // It should never reach here
-                    log.warn("No ledgers for schema id: {}", schemaId);
-                    return completedFuture(version);
-                }
+                });
+                return future;
             }
         });
     }
@@ -578,10 +585,6 @@ public class BookkeeperSchemaStorage implements 
SchemaStorage {
                         if (rc != BKException.Code.OK) {
                             future.completeExceptionally(bkException("Failed 
to create ledger", rc, -1, -1));
                         } else {
-                            schemaLedgers.computeIfAbsent(
-                                    schemaId,
-                                    key -> Collections.synchronizedList(new 
ArrayList<>())
-                            ).add(handle.getId());
                             future.complete(handle);
                         }
                     }, null, metadata);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
index 595da2b..d72cbf1 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
@@ -20,13 +20,23 @@ package org.apache.pulsar.schema;
 
 import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT;
 import static 
org.apache.pulsar.schema.compatibility.SchemaCompatibilityCheckTest.randomName;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+import com.google.common.collect.Sets;
 import java.util.Collections;
-
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage;
+import org.apache.pulsar.broker.service.schema.SchemaRegistry;
 import org.apache.pulsar.broker.service.schema.SchemaRegistryServiceImpl;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
@@ -39,14 +49,11 @@ import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.schema.SchemaType;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
-import com.google.common.collect.Sets;
-
-import lombok.extern.slf4j.Slf4j;
-
 @Slf4j
 public class SchemaTest extends MockedPulsarServiceBaseTest {
 
@@ -206,4 +213,62 @@ public class SchemaTest extends 
MockedPulsarServiceBaseTest {
             }
         }
     }
+
+    @Test
+    public void testDeleteTopicAndSchema() throws Exception {
+        final String tenant = PUBLIC_TENANT;
+        final String namespace = "test-namespace-" + randomName(16);
+        final String topicName = "test-delete-topic-and-schema";
+
+        final String topic = TopicName.get(
+                TopicDomain.persistent.value(),
+                tenant,
+                namespace,
+                topicName).toString();
+
+        admin.namespaces().createNamespace(
+                tenant + "/" + namespace,
+                Sets.newHashSet(CLUSTER_NAME));
+
+        @Cleanup
+        Producer<Schemas.PersonOne> p1 = 
pulsarClient.newProducer(Schema.JSON(Schemas.PersonOne.class))
+                .topic(topic)
+                .create();
+
+        @Cleanup
+        Producer<Schemas.PersonThree> p2 = 
pulsarClient.newProducer(Schema.JSON(Schemas.PersonThree.class))
+                .topic(topic)
+                .create();
+
+        List<CompletableFuture<SchemaRegistry.SchemaAndMetadata>> 
schemaFutures =
+                
this.pulsar.getSchemaRegistryService().getAllSchemas(TopicName.get(topic).getSchemaName()).get();
+        FutureUtil.waitForAll(schemaFutures).get();
+        List<SchemaRegistry.SchemaAndMetadata> schemas = 
schemaFutures.stream().map(future -> {
+            try {
+                return future.get();
+            } catch (Exception e) {
+                return null;
+            }
+        }).collect(Collectors.toList());
+
+        assertEquals(schemas.size(), 2);
+        for (SchemaRegistry.SchemaAndMetadata schema : schemas) {
+            assertNotNull(schema);
+        }
+
+        List<Long> ledgers = 
((BookkeeperSchemaStorage)this.pulsar.getSchemaStorage())
+                .getSchemaLedgerList(TopicName.get(topic).getSchemaName());
+        assertEquals(ledgers.size(), 2);
+        admin.topics().delete(topic, true, true);
+        assertEquals(this.pulsar.getSchemaRegistryService()
+                
.trimDeletedSchemaAndGetList(TopicName.get(topic).getSchemaName()).get().size(),
 0);
+
+        for (Long ledger : ledgers) {
+            try {
+                pulsar.getBookKeeperClient().openLedger(ledger, 
BookKeeper.DigestType.CRC32, new byte[]{});
+                fail();
+            } catch (BKException.BKNoSuchLedgerExistsException ignore) {
+            }
+        }
+    }
 }

Reply via email to