This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new b4b05b530dc [fix][broker]Topic deleting failed after removed local
cluster from namespace policies (#25114)
b4b05b530dc is described below
commit b4b05b530dc623f128e744b225908141a4986cda
Author: fengyubiao <[email protected]>
AuthorDate: Wed Jan 7 12:41:43 2026 +0800
[fix][broker]Topic deleting failed after removed local cluster from
namespace policies (#25114)
---
.../broker/service/persistent/PersistentTopic.java | 43 ++++++++++------------
.../service/OneWayReplicatorUsingGlobalZKTest.java | 1 +
2 files changed, 20 insertions(+), 24 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 22f3be27d58..ae1177d295e 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1273,34 +1273,29 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
.getTransactionPendingAckStoreSuffix(topic,
Codec.encode(subscriptionName)));
if
(brokerService.pulsar().getConfiguration().isTransactionCoordinatorEnabled()) {
- CompletableFuture<ManagedLedgerConfig> managedLedgerConfig =
getBrokerService().getManagedLedgerConfig(tn);
- managedLedgerConfig.thenAccept(config -> {
- ManagedLedgerFactory managedLedgerFactory =
- getBrokerService().getManagedLedgerFactoryForTopic(tn,
config.getStorageClassName());
+ ManagedLedgerConfig managedLedgerConfig = ledger.getConfig();
+ ManagedLedgerFactory managedLedgerFactory = getBrokerService()
+ .getManagedLedgerFactoryForTopic(tn,
managedLedgerConfig.getStorageClassName());
managedLedgerFactory.asyncDelete(tn.getPersistenceNamingEncoding(),
- managedLedgerConfig,
- new AsyncCallbacks.DeleteLedgerCallback() {
- @Override
- public void deleteLedgerComplete(Object ctx) {
+ CompletableFuture.completedFuture(managedLedgerConfig),
+ new AsyncCallbacks.DeleteLedgerCallback() {
+ @Override
+ public void deleteLedgerComplete(Object ctx) {
+
asyncDeleteCursorWithClearDelayedMessage(subscriptionName, unsubscribeFuture);
+ }
+
+ @Override
+ public void deleteLedgerFailed(ManagedLedgerException
exception, Object ctx) {
+ if (exception instanceof
MetadataNotFoundException) {
asyncDeleteCursorWithClearDelayedMessage(subscriptionName, unsubscribeFuture);
+ return;
}
- @Override
- public void
deleteLedgerFailed(ManagedLedgerException exception, Object ctx) {
- if (exception instanceof
MetadataNotFoundException) {
-
asyncDeleteCursorWithClearDelayedMessage(subscriptionName, unsubscribeFuture);
- return;
- }
-
-
unsubscribeFuture.completeExceptionally(exception);
- log.error("[{}][{}] Error deleting
subscription pending ack store",
- topic, subscriptionName, exception);
- }
- }, null);
- }).exceptionally(ex -> {
- unsubscribeFuture.completeExceptionally(ex);
- return null;
- });
+ unsubscribeFuture.completeExceptionally(exception);
+ log.error("[{}][{}] Error deleting subscription
pending ack store",
+ topic, subscriptionName, exception);
+ }
+ }, null);
} else {
asyncDeleteCursorWithClearDelayedMessage(subscriptionName,
unsubscribeFuture);
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
index 60672845b5e..db16963f208 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
@@ -495,6 +495,7 @@ public class OneWayReplicatorUsingGlobalZKTest extends
OneWayReplicatorTest {
admin1.namespaces().createNamespace(ns1);
admin1.namespaces().setNamespaceReplicationClusters(ns1, new
HashSet<>(Arrays.asList(cluster1, cluster2)));
admin1.topics().createNonPartitionedTopic(topic);
+ admin1.topics().createSubscription(topic, "s1", MessageId.earliest);
// Wait for loading topic up.
Producer<String> p =
client1.newProducer(Schema.STRING).topic(topic).create();