This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 9f336506b1c [fix][broker] Fix deleting topic not delete the related 
topic policy and schema. (#21093)
9f336506b1c is described below

commit 9f336506b1c0967323a897c91eca649ab0511479
Author: Jiwei Guo <[email protected]>
AuthorDate: Fri Sep 1 18:07:46 2023 +0800

    [fix][broker] Fix deleting topic not delete the related topic policy and 
schema. (#21093)
    
    Fixes #21075
    
    ### Motivation
    
    When the topic is loaded, it will delete the topic-level policy if it is 
enabled. But if the topic is not loaded, it will directly delete through 
managed ledger factory. But then we will leave the topic policy there. When the 
topic is created next time, it will use the old topic policy
    
    ### Modifications
    
    When deleting the topic, delete the schema and topic policies even if the 
topic is not loaded.
---
 .../pulsar/broker/service/AbstractTopic.java       | 17 +------
 .../pulsar/broker/service/BrokerService.java       | 55 +++++++++++++++++-----
 .../broker/service/BrokerBkEnsemblesTests.java     | 32 ++++++++++---
 .../systopic/PartitionedSystemTopicTest.java       | 25 ++++++++++
 4 files changed, 94 insertions(+), 35 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index b15f8cbf0b8..cef2dd2080c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -58,7 +58,6 @@ import 
org.apache.pulsar.broker.service.BrokerServiceException.ProducerFencedExc
 import 
org.apache.pulsar.broker.service.BrokerServiceException.TopicMigratedException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.TopicTerminatedException;
 import org.apache.pulsar.broker.service.plugin.EntryFilter;
-import org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage;
 import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
 import 
org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
 import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
@@ -674,21 +673,7 @@ public abstract class AbstractTopic implements Topic, 
TopicPolicyListener<TopicP
 
     @Override
     public CompletableFuture<SchemaVersion> deleteSchema() {
-        String id = getSchemaId();
-        SchemaRegistryService schemaRegistryService = 
brokerService.pulsar().getSchemaRegistryService();
-        return 
BookkeeperSchemaStorage.ignoreUnrecoverableBKException(schemaRegistryService.getSchema(id))
-                .thenCompose(schema -> {
-                    if (schema != null) {
-                        // It's different from `SchemasResource.deleteSchema`
-                        // because when we delete a topic, the schema
-                        // history is meaningless. But when we delete a schema 
of a topic, a new schema could be
-                        // registered in the future.
-                        log.info("Delete schema storage of id: {}", id);
-                        return schemaRegistryService.deleteSchemaStorage(id);
-                    } else {
-                        return CompletableFuture.completedFuture(null);
-                    }
-                });
+        return brokerService.deleteSchema(TopicName.get(getName()));
     }
 
     @Override
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 6c48e4d9ae8..2cf141ed329 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -119,6 +119,8 @@ import 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleC
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.service.persistent.SystemTopic;
 import org.apache.pulsar.broker.service.plugin.EntryFilterProvider;
+import org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage;
+import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
 import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
 import org.apache.pulsar.broker.stats.prometheus.metrics.ObserverGauge;
 import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
@@ -159,6 +161,7 @@ import 
org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.TopicPolicies;
 import org.apache.pulsar.common.policies.data.TopicType;
 import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
+import org.apache.pulsar.common.protocol.schema.SchemaVersion;
 import org.apache.pulsar.common.stats.Metrics;
 import org.apache.pulsar.common.util.FieldParser;
 import org.apache.pulsar.common.util.FutureUtil;
@@ -1156,26 +1159,33 @@ public class BrokerService implements Closeable {
         CompletableFuture<Void> future = new CompletableFuture<>();
         CompletableFuture<Void> deleteTopicAuthenticationFuture = new 
CompletableFuture<>();
         deleteTopicAuthenticationWithRetry(topic, 
deleteTopicAuthenticationFuture, 5);
-
-        deleteTopicAuthenticationFuture.whenComplete((v, ex) -> {
+        deleteTopicAuthenticationFuture
+        .thenCompose(__ -> deleteSchema(tn))
+        .thenCompose(__ -> {
+            if (!SystemTopicNames.isTopicPoliciesSystemTopic(topic)
+                    && getPulsar().getConfiguration().isSystemTopicEnabled()) {
+                return deleteTopicPolicies(tn);
+            }
+            return CompletableFuture.completedFuture(null);
+        }).whenComplete((v, ex) -> {
             if (ex != null) {
                 future.completeExceptionally(ex);
                 return;
             }
             CompletableFuture<ManagedLedgerConfig> mlConfigFuture = 
getManagedLedgerConfig(topicName);
             managedLedgerFactory.asyncDelete(tn.getPersistenceNamingEncoding(),
-                    mlConfigFuture, new DeleteLedgerCallback() {
-                        @Override
-                        public void deleteLedgerComplete(Object ctx) {
-                            future.complete(null);
-                        }
+                mlConfigFuture, new DeleteLedgerCallback() {
+                    @Override
+                    public void deleteLedgerComplete(Object ctx) {
+                        future.complete(null);
+                    }
 
-                        @Override
-                        public void deleteLedgerFailed(ManagedLedgerException 
exception, Object ctx) {
-                            future.completeExceptionally(exception);
-                        }
-                    }, null);
-        });
+                    @Override
+                    public void deleteLedgerFailed(ManagedLedgerException 
exception, Object ctx) {
+                        future.completeExceptionally(exception);
+                    }
+                }, null);
+         });
 
         return future;
     }
@@ -3440,6 +3450,25 @@ public class BrokerService implements Closeable {
                 
.deleteTopicPoliciesAsync(TopicName.get(topicName.getPartitionedTopicName()));
     }
 
+    CompletableFuture<SchemaVersion> deleteSchema(TopicName topicName) {
+        String base = topicName.getPartitionedTopicName();
+        String id = TopicName.get(base).getSchemaName();
+        SchemaRegistryService schemaRegistryService = 
getPulsar().getSchemaRegistryService();
+        return 
BookkeeperSchemaStorage.ignoreUnrecoverableBKException(schemaRegistryService.getSchema(id))
+                .thenCompose(schema -> {
+                    if (schema != null) {
+                        // It's different from `SchemasResource.deleteSchema`
+                        // because when we delete a topic, the schema
+                        // history is meaningless. But when we delete a schema 
of a topic, a new schema could be
+                        // registered in the future.
+                        log.info("Delete schema storage of id: {}", id);
+                        return 
getPulsar().getSchemaRegistryService().deleteSchemaStorage(id);
+                    } else {
+                        return CompletableFuture.completedFuture(null);
+                    }
+                });
+    }
+
     private CompletableFuture<Void> checkMaxTopicsPerNamespace(TopicName 
topicName, int numPartitions) {
         return pulsar.getPulsarResources().getNamespaceResources()
                 .getPoliciesAsync(topicName.getNamespaceObject())
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
index 9f19bda3647..40649a41640 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
@@ -21,8 +21,8 @@ package org.apache.pulsar.broker.service;
 import static 
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotEquals;
+import static org.testng.Assert.assertThrows;
 import static org.testng.Assert.fail;
-
 import java.lang.reflect.Field;
 import java.util.Map.Entry;
 import java.util.NavigableMap;
@@ -31,10 +31,8 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
-
 import com.google.common.collect.Sets;
 import lombok.Cleanup;
-
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
@@ -47,6 +45,7 @@ import 
org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.Ledge
 import org.apache.bookkeeper.util.StringUtils;
 import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Producer;
@@ -497,10 +496,31 @@ public class BrokerBkEnsemblesTests extends 
BkEnsemblesTestBase {
             // Expected
         }
 
-        // Deletion must succeed
-        admin.topics().delete(topic);
+        assertThrows(PulsarAdminException.ServerSideErrorException.class, () 
-> admin.topics().delete(topic));
+    }
+
+    @Test
+    public void testDeleteTopicWithoutTopicLoaded() throws Exception {
+        String namespace = BrokerTestUtil.newUniqueName("prop/usc");
+        admin.namespaces().createNamespace(namespace);
+
+        String topic = BrokerTestUtil.newUniqueName(namespace + "/my-topic");
+
+        @Cleanup
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl(pulsar.getBrokerServiceUrl())
+                .statsInterval(0, TimeUnit.SECONDS)
+                .build();
 
-        // Topic will not be there after
+        @Cleanup
+        Producer<String> producer = client.newProducer(Schema.STRING)
+                .topic(topic)
+                .create();
+
+        producer.close();
+        admin.topics().unload(topic);
+
+        admin.topics().delete(topic);
         assertEquals(pulsar.getBrokerService().getTopicIfExists(topic).join(), 
Optional.empty());
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
index 6f56852cae3..4af0bd90523 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pulsar.broker.systopic;
 
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNull;
 import com.google.common.collect.Sets;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
@@ -37,6 +39,7 @@ import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.service.BrokerTestBase;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.service.schema.SchemaRegistry;
 import org.apache.pulsar.client.admin.ListTopicsOptions;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
@@ -55,6 +58,7 @@ import org.apache.pulsar.common.naming.TopicVersion;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.policies.data.TopicPolicies;
 import org.apache.pulsar.common.policies.data.TopicType;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.awaitility.Awaitility;
@@ -299,4 +303,25 @@ public class PartitionedSystemTopicTest extends 
BrokerTestBase {
         writer1.get().close();
         writer2.get().close();
     }
+
+    @Test
+    public void testDeleteTopicSchemaAndPolicyWhenTopicIsNotLoaded() throws 
Exception {
+        final String ns = "prop/ns-test";
+        admin.namespaces().createNamespace(ns, 2);
+        final String topicName = 
"persistent://prop/ns-test/testDeleteTopicSchemaAndPolicyWhenTopicIsNotLoaded";
+        admin.topics().createNonPartitionedTopic(topicName);
+        
pulsarClient.newProducer(Schema.STRING).topic(topicName).create().close();
+        admin.topicPolicies().setMaxConsumers(topicName, 2);
+        Awaitility.await().untilAsserted(() -> 
assertEquals(admin.topicPolicies().getMaxConsumers(topicName), 2));
+        CompletableFuture<Optional<Topic>> topic = 
pulsar.getBrokerService().getTopic(topicName, false);
+        PersistentTopic persistentTopic = (PersistentTopic) topic.join().get();
+        persistentTopic.close();
+        admin.topics().delete(topicName);
+        TopicPolicies topicPolicies = 
pulsar.getTopicPoliciesService().getTopicPoliciesIfExists(TopicName.get(topicName));
+        assertNull(topicPolicies);
+        String base = TopicName.get(topicName).getPartitionedTopicName();
+        String id = TopicName.get(base).getSchemaName();
+        CompletableFuture<SchemaRegistry.SchemaAndMetadata> schema = 
pulsar.getSchemaRegistryService().getSchema(id);
+        assertNull(schema.join());
+    }
 }

Reply via email to