This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new b4b05b530dc [fix][broker]Topic deleting failed after removed local 
cluster from namespace policies (#25114)
b4b05b530dc is described below

commit b4b05b530dc623f128e744b225908141a4986cda
Author: fengyubiao <[email protected]>
AuthorDate: Wed Jan 7 12:41:43 2026 +0800

    [fix][broker]Topic deleting failed after removed local cluster from 
namespace policies (#25114)
---
 .../broker/service/persistent/PersistentTopic.java | 43 ++++++++++------------
 .../service/OneWayReplicatorUsingGlobalZKTest.java |  1 +
 2 files changed, 20 insertions(+), 24 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 22f3be27d58..ae1177d295e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1273,34 +1273,29 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                 .getTransactionPendingAckStoreSuffix(topic,
                         Codec.encode(subscriptionName)));
         if 
(brokerService.pulsar().getConfiguration().isTransactionCoordinatorEnabled()) {
-            CompletableFuture<ManagedLedgerConfig> managedLedgerConfig = 
getBrokerService().getManagedLedgerConfig(tn);
-            managedLedgerConfig.thenAccept(config -> {
-                ManagedLedgerFactory managedLedgerFactory =
-                        getBrokerService().getManagedLedgerFactoryForTopic(tn, 
config.getStorageClassName());
+            ManagedLedgerConfig managedLedgerConfig = ledger.getConfig();
+                ManagedLedgerFactory managedLedgerFactory = getBrokerService()
+                        .getManagedLedgerFactoryForTopic(tn, 
managedLedgerConfig.getStorageClassName());
                 
managedLedgerFactory.asyncDelete(tn.getPersistenceNamingEncoding(),
-                        managedLedgerConfig,
-                        new AsyncCallbacks.DeleteLedgerCallback() {
-                            @Override
-                            public void deleteLedgerComplete(Object ctx) {
+                    CompletableFuture.completedFuture(managedLedgerConfig),
+                    new AsyncCallbacks.DeleteLedgerCallback() {
+                        @Override
+                        public void deleteLedgerComplete(Object ctx) {
+                            
asyncDeleteCursorWithClearDelayedMessage(subscriptionName, unsubscribeFuture);
+                        }
+
+                        @Override
+                        public void deleteLedgerFailed(ManagedLedgerException 
exception, Object ctx) {
+                            if (exception instanceof 
MetadataNotFoundException) {
                                 
asyncDeleteCursorWithClearDelayedMessage(subscriptionName, unsubscribeFuture);
+                                return;
                             }
 
-                            @Override
-                            public void 
deleteLedgerFailed(ManagedLedgerException exception, Object ctx) {
-                                if (exception instanceof 
MetadataNotFoundException) {
-                                    
asyncDeleteCursorWithClearDelayedMessage(subscriptionName, unsubscribeFuture);
-                                    return;
-                                }
-
-                                
unsubscribeFuture.completeExceptionally(exception);
-                                log.error("[{}][{}] Error deleting 
subscription pending ack store",
-                                        topic, subscriptionName, exception);
-                            }
-                        }, null);
-            }).exceptionally(ex -> {
-                unsubscribeFuture.completeExceptionally(ex);
-                return null;
-            });
+                            unsubscribeFuture.completeExceptionally(exception);
+                            log.error("[{}][{}] Error deleting subscription 
pending ack store",
+                                    topic, subscriptionName, exception);
+                        }
+                    }, null);
         } else {
             asyncDeleteCursorWithClearDelayedMessage(subscriptionName, 
unsubscribeFuture);
         }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
index 60672845b5e..db16963f208 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
@@ -495,6 +495,7 @@ public class OneWayReplicatorUsingGlobalZKTest extends 
OneWayReplicatorTest {
         admin1.namespaces().createNamespace(ns1);
         admin1.namespaces().setNamespaceReplicationClusters(ns1, new 
HashSet<>(Arrays.asList(cluster1, cluster2)));
         admin1.topics().createNonPartitionedTopic(topic);
+        admin1.topics().createSubscription(topic, "s1", MessageId.earliest);
 
         // Wait for loading topic up.
         Producer<String> p = 
client1.newProducer(Schema.STRING).topic(topic).create();

Reply via email to