This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 9f336506b1c [fix][broker] Fix deleting topic not delete the related
topic policy and schema. (#21093)
9f336506b1c is described below
commit 9f336506b1c0967323a897c91eca649ab0511479
Author: Jiwei Guo <[email protected]>
AuthorDate: Fri Sep 1 18:07:46 2023 +0800
[fix][broker] Fix deleting topic not delete the related topic policy and
schema. (#21093)
Fixes #21075
### Motivation
When the topic is loaded, it will delete the topic-level policy if it is
enabled. But if the topic is not loaded, it will directly delete through
managed ledger factory. But then we will leave the topic policy there. When the
topic is created next time, it will use the old topic policy
### Modifications
When deleting the topic, delete the schema and topic policies even if the
topic is not loaded.
---
.../pulsar/broker/service/AbstractTopic.java | 17 +------
.../pulsar/broker/service/BrokerService.java | 55 +++++++++++++++++-----
.../broker/service/BrokerBkEnsemblesTests.java | 32 ++++++++++---
.../systopic/PartitionedSystemTopicTest.java | 25 ++++++++++
4 files changed, 94 insertions(+), 35 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index b15f8cbf0b8..cef2dd2080c 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -58,7 +58,6 @@ import
org.apache.pulsar.broker.service.BrokerServiceException.ProducerFencedExc
import
org.apache.pulsar.broker.service.BrokerServiceException.TopicMigratedException;
import
org.apache.pulsar.broker.service.BrokerServiceException.TopicTerminatedException;
import org.apache.pulsar.broker.service.plugin.EntryFilter;
-import org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage;
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import
org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
@@ -674,21 +673,7 @@ public abstract class AbstractTopic implements Topic,
TopicPolicyListener<TopicP
@Override
public CompletableFuture<SchemaVersion> deleteSchema() {
- String id = getSchemaId();
- SchemaRegistryService schemaRegistryService =
brokerService.pulsar().getSchemaRegistryService();
- return
BookkeeperSchemaStorage.ignoreUnrecoverableBKException(schemaRegistryService.getSchema(id))
- .thenCompose(schema -> {
- if (schema != null) {
- // It's different from `SchemasResource.deleteSchema`
- // because when we delete a topic, the schema
- // history is meaningless. But when we delete a schema
of a topic, a new schema could be
- // registered in the future.
- log.info("Delete schema storage of id: {}", id);
- return schemaRegistryService.deleteSchemaStorage(id);
- } else {
- return CompletableFuture.completedFuture(null);
- }
- });
+ return brokerService.deleteSchema(TopicName.get(getName()));
}
@Override
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 6c48e4d9ae8..2cf141ed329 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -119,6 +119,8 @@ import
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleC
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.persistent.SystemTopic;
import org.apache.pulsar.broker.service.plugin.EntryFilterProvider;
+import org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage;
+import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
import org.apache.pulsar.broker.stats.prometheus.metrics.ObserverGauge;
import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
@@ -159,6 +161,7 @@ import
org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
+import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.common.util.FieldParser;
import org.apache.pulsar.common.util.FutureUtil;
@@ -1156,26 +1159,33 @@ public class BrokerService implements Closeable {
CompletableFuture<Void> future = new CompletableFuture<>();
CompletableFuture<Void> deleteTopicAuthenticationFuture = new
CompletableFuture<>();
deleteTopicAuthenticationWithRetry(topic,
deleteTopicAuthenticationFuture, 5);
-
- deleteTopicAuthenticationFuture.whenComplete((v, ex) -> {
+ deleteTopicAuthenticationFuture
+ .thenCompose(__ -> deleteSchema(tn))
+ .thenCompose(__ -> {
+ if (!SystemTopicNames.isTopicPoliciesSystemTopic(topic)
+ && getPulsar().getConfiguration().isSystemTopicEnabled()) {
+ return deleteTopicPolicies(tn);
+ }
+ return CompletableFuture.completedFuture(null);
+ }).whenComplete((v, ex) -> {
if (ex != null) {
future.completeExceptionally(ex);
return;
}
CompletableFuture<ManagedLedgerConfig> mlConfigFuture =
getManagedLedgerConfig(topicName);
managedLedgerFactory.asyncDelete(tn.getPersistenceNamingEncoding(),
- mlConfigFuture, new DeleteLedgerCallback() {
- @Override
- public void deleteLedgerComplete(Object ctx) {
- future.complete(null);
- }
+ mlConfigFuture, new DeleteLedgerCallback() {
+ @Override
+ public void deleteLedgerComplete(Object ctx) {
+ future.complete(null);
+ }
- @Override
- public void deleteLedgerFailed(ManagedLedgerException
exception, Object ctx) {
- future.completeExceptionally(exception);
- }
- }, null);
- });
+ @Override
+ public void deleteLedgerFailed(ManagedLedgerException
exception, Object ctx) {
+ future.completeExceptionally(exception);
+ }
+ }, null);
+ });
return future;
}
@@ -3440,6 +3450,25 @@ public class BrokerService implements Closeable {
.deleteTopicPoliciesAsync(TopicName.get(topicName.getPartitionedTopicName()));
}
+ CompletableFuture<SchemaVersion> deleteSchema(TopicName topicName) {
+ String base = topicName.getPartitionedTopicName();
+ String id = TopicName.get(base).getSchemaName();
+ SchemaRegistryService schemaRegistryService =
getPulsar().getSchemaRegistryService();
+ return
BookkeeperSchemaStorage.ignoreUnrecoverableBKException(schemaRegistryService.getSchema(id))
+ .thenCompose(schema -> {
+ if (schema != null) {
+ // It's different from `SchemasResource.deleteSchema`
+ // because when we delete a topic, the schema
+ // history is meaningless. But when we delete a schema
of a topic, a new schema could be
+ // registered in the future.
+ log.info("Delete schema storage of id: {}", id);
+ return
getPulsar().getSchemaRegistryService().deleteSchemaStorage(id);
+ } else {
+ return CompletableFuture.completedFuture(null);
+ }
+ });
+ }
+
private CompletableFuture<Void> checkMaxTopicsPerNamespace(TopicName
topicName, int numPartitions) {
return pulsar.getPulsarResources().getNamespaceResources()
.getPoliciesAsync(topicName.getNamespaceObject())
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
index 9f19bda3647..40649a41640 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
@@ -21,8 +21,8 @@ package org.apache.pulsar.broker.service;
import static
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals;
+import static org.testng.Assert.assertThrows;
import static org.testng.Assert.fail;
-
import java.lang.reflect.Field;
import java.util.Map.Entry;
import java.util.NavigableMap;
@@ -31,10 +31,8 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
-
import com.google.common.collect.Sets;
import lombok.Cleanup;
-
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
@@ -47,6 +45,7 @@ import
org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.Ledge
import org.apache.bookkeeper.util.StringUtils;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
@@ -497,10 +496,31 @@ public class BrokerBkEnsemblesTests extends
BkEnsemblesTestBase {
// Expected
}
- // Deletion must succeed
- admin.topics().delete(topic);
+ assertThrows(PulsarAdminException.ServerSideErrorException.class, ()
-> admin.topics().delete(topic));
+ }
+
+ @Test
+ public void testDeleteTopicWithoutTopicLoaded() throws Exception {
+ String namespace = BrokerTestUtil.newUniqueName("prop/usc");
+ admin.namespaces().createNamespace(namespace);
+
+ String topic = BrokerTestUtil.newUniqueName(namespace + "/my-topic");
+
+ @Cleanup
+ PulsarClient client = PulsarClient.builder()
+ .serviceUrl(pulsar.getBrokerServiceUrl())
+ .statsInterval(0, TimeUnit.SECONDS)
+ .build();
- // Topic will not be there after
+ @Cleanup
+ Producer<String> producer = client.newProducer(Schema.STRING)
+ .topic(topic)
+ .create();
+
+ producer.close();
+ admin.topics().unload(topic);
+
+ admin.topics().delete(topic);
assertEquals(pulsar.getBrokerService().getTopicIfExists(topic).join(),
Optional.empty());
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
index 6f56852cae3..4af0bd90523 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
@@ -18,6 +18,8 @@
*/
package org.apache.pulsar.broker.systopic;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNull;
import com.google.common.collect.Sets;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
@@ -37,6 +39,7 @@ import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.service.schema.SchemaRegistry;
import org.apache.pulsar.client.admin.ListTopicsOptions;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
@@ -55,6 +58,7 @@ import org.apache.pulsar.common.naming.TopicVersion;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.common.util.FutureUtil;
import org.awaitility.Awaitility;
@@ -299,4 +303,25 @@ public class PartitionedSystemTopicTest extends
BrokerTestBase {
writer1.get().close();
writer2.get().close();
}
+
+ @Test
+ public void testDeleteTopicSchemaAndPolicyWhenTopicIsNotLoaded() throws
Exception {
+ final String ns = "prop/ns-test";
+ admin.namespaces().createNamespace(ns, 2);
+ final String topicName =
"persistent://prop/ns-test/testDeleteTopicSchemaAndPolicyWhenTopicIsNotLoaded";
+ admin.topics().createNonPartitionedTopic(topicName);
+
pulsarClient.newProducer(Schema.STRING).topic(topicName).create().close();
+ admin.topicPolicies().setMaxConsumers(topicName, 2);
+ Awaitility.await().untilAsserted(() ->
assertEquals(admin.topicPolicies().getMaxConsumers(topicName), 2));
+ CompletableFuture<Optional<Topic>> topic =
pulsar.getBrokerService().getTopic(topicName, false);
+ PersistentTopic persistentTopic = (PersistentTopic) topic.join().get();
+ persistentTopic.close();
+ admin.topics().delete(topicName);
+ TopicPolicies topicPolicies =
pulsar.getTopicPoliciesService().getTopicPoliciesIfExists(TopicName.get(topicName));
+ assertNull(topicPolicies);
+ String base = TopicName.get(topicName).getPartitionedTopicName();
+ String id = TopicName.get(base).getSchemaName();
+ CompletableFuture<SchemaRegistry.SchemaAndMetadata> schema =
pulsar.getSchemaRegistryService().getSchema(id);
+ assertNull(schema.join());
+ }
}