This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 746c3efd536 [fix][client][branch-4.0] Partitioned topics are 
unexpectedly created by client after deletion (#24554) (#24571)
746c3efd536 is described below

commit 746c3efd5361352e50efc15e02523c4f477e8681
Author: Yunze Xu <xyzinfern...@163.com>
AuthorDate: Tue Jul 29 22:00:35 2025 +0800

    [fix][client][branch-4.0] Partitioned topics are unexpectedly created by 
client after deletion (#24554) (#24571)
    
    (cherry picked from commit 16271dc888c20d3e2233f1717b37f6f13fcb8af3)
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 15 +++++-
 .../pulsar/broker/admin/TopicAutoCreationTest.java | 63 +++++++++++++++++++---
 .../client/impl/MultiTopicsConsumerImpl.java       |  2 +-
 .../client/impl/PartitionedProducerImpl.java       |  2 +-
 4 files changed, 72 insertions(+), 10 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 869f60cad31..86d34dae850 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -3064,7 +3064,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
 
         // Truncate to ensure the offloaded data is not orphaned.
         // Also ensures the BK ledgers are deleted and not just scheduled for 
deletion
-        CompletableFuture<Void> truncateFuture = asyncTruncate();
+        CompletableFuture<Void> truncateFuture = asyncTruncate(true);
         truncateFuture.whenComplete((ignore, exc) -> {
             if (exc != null) {
                 log.error("[{}] Error truncating ledger for deletion", name, 
exc);
@@ -4472,6 +4472,12 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
 
     @Override
     public CompletableFuture<Void> asyncTruncate() {
+        return asyncTruncate(false);
+    }
+
+    // When asyncTruncate is called by asyncDelete, the argument should be 
true because cursors will not be accessed
+    // after the managed ledger is deleted.
+    private CompletableFuture<Void> asyncTruncate(boolean ignoreCursorFailure) 
{
 
         final List<CompletableFuture<Void>> futures = new ArrayList();
         for (ManagedCursor cursor : cursors) {
@@ -4484,7 +4490,12 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
 
                 @Override
                 public void clearBacklogFailed(ManagedLedgerException 
exception, Object ctx) {
-                    future.completeExceptionally(exception);
+                    if (ignoreCursorFailure) {
+                        log.warn("Failed to clear backlog for cursor {}", 
cursor.getName(), exception);
+                        future.complete(null);
+                    } else {
+                        future.completeExceptionally(exception);
+                    }
                 }
             }, null);
             futures.add(future);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java
index 45c7dbea2ab..2c8aae6fbe5 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java
@@ -24,7 +24,9 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
+import java.io.Closeable;
 import java.net.InetSocketAddress;
+import java.time.Duration;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
@@ -37,17 +39,20 @@ import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.impl.LookupService;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
 import org.apache.pulsar.common.policies.data.TopicType;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 @Test(groups = "broker-admin")
@@ -55,7 +60,7 @@ import org.testng.annotations.Test;
 public class TopicAutoCreationTest extends ProducerConsumerBase {
 
     @Override
-    @BeforeMethod
+    @BeforeClass
     protected void setup() throws Exception {
         conf.setAllowAutoTopicCreationType(TopicType.PARTITIONED);
         conf.setAllowAutoTopicCreation(true);
@@ -71,7 +76,7 @@ public class TopicAutoCreationTest extends 
ProducerConsumerBase {
     }
 
     @Override
-    @AfterMethod(alwaysRun = true)
+    @AfterClass(alwaysRun = true)
     protected void cleanup() throws Exception {
         super.internalCleanup();
     }
@@ -87,9 +92,11 @@ public class TopicAutoCreationTest extends 
ProducerConsumerBase {
                 .create();
 
         List<String> partitionedTopics = 
admin.topics().getPartitionedTopicList(namespaceName);
+        assertTrue(partitionedTopics.contains(topic));
         List<String> topics = admin.topics().getList(namespaceName);
-        assertEquals(partitionedTopics.size(), 1);
-        assertEquals(topics.size(), 3);
+        for (int i = 0; i < conf.getDefaultNumPartitions(); i++) {
+            assertTrue(topics.contains(topic + 
TopicName.PARTITIONED_TOPIC_SUFFIX + i));
+        }
 
         producer.close();
         for (String t : topics) {
@@ -248,4 +255,48 @@ public class TopicAutoCreationTest extends 
ProducerConsumerBase {
         admin.namespaces().deleteNamespace(namespace, true);
     }
 
+    @Test
+    public void testPartitionsNotCreatedAfterDeletion() throws Exception {
+        @Cleanup final var client = 
PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl()).build();
+        final var topicName = 
TopicName.get("my-property/my-ns/testPartitionsNotCreatedAfterDeletion");
+        final var topic = topicName.toString();
+        final var interval = Duration.ofSeconds(1);
+        final ThrowableConsumer<ThrowableSupplier<Closeable>> verifier = 
creator -> {
+            admin.topics().createPartitionedTopic(topic, 1);
+            boolean needCleanup = false;
+            try (final var ignored = creator.get()) {
+                admin.topics().terminatePartitionedTopic(topic);
+                admin.topics().deletePartitionedTopic(topic, true);
+                Thread.sleep(interval.toMillis() + 500); // wait until the 
auto update partitions task has run
+
+                final var topics = 
admin.topics().getList(topicName.getNamespace()).stream()
+                        .filter(__ -> 
__.contains(topicName.getLocalName())).toList();
+                // Without https://github.com/apache/pulsar/pull/24118, the 
producer or consumer on partition 0 could be
+                // automatically created.
+                if (!topics.isEmpty()) {
+                    assertEquals(topics, 
List.of(topicName.getPartition(0).toString()));
+                    needCleanup = true;
+                }
+            }
+            if (needCleanup) {
+                admin.topics().delete(topicName.getPartition(0).toString());
+            }
+        };
+        verifier.accept(() -> client.newProducer().topic(topic)
+                .autoUpdatePartitionsInterval(interval.toSecondsPart(), 
TimeUnit.SECONDS).create());
+        verifier.accept(() -> 
client.newConsumer().topic(topic).subscriptionName("sub")
+                .autoUpdatePartitionsInterval(interval.toSecondsPart(), 
TimeUnit.SECONDS).subscribe());
+        verifier.accept(() -> 
client.newReader().topic(topic).startMessageId(MessageId.earliest)
+                .autoUpdatePartitionsInterval(interval.toSecondsPart(), 
TimeUnit.SECONDS).create());
+    }
+
+    private interface ThrowableConsumer<T> {
+
+        void accept(T value) throws Exception;
+    }
+
+    public interface ThrowableSupplier<T> {
+
+        T get() throws Exception;
+    }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 451ddaf76db..359eb1d2c96 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -1395,7 +1395,7 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
     private CompletableFuture<Void> subscribeIncreasedTopicPartitions(String 
topicName) {
         int oldPartitionNumber = partitionedTopics.get(topicName);
 
-        return client.getPartitionsForTopic(topicName).thenCompose(list -> {
+        return client.getPartitionsForTopic(topicName, false).thenCompose(list 
-> {
             int currentPartitionNumber = Long.valueOf(list.stream()
                     .filter(t -> 
TopicName.get(t).isPartitioned()).count()).intValue();
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
index 77ed7d3454b..cee86de8d59 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
@@ -391,7 +391,7 @@ public class PartitionedProducerImpl<T> extends 
ProducerBase<T> {
                 return future;
             }
 
-            client.getPartitionsForTopic(topic).thenCompose(list -> {
+            client.getPartitionsForTopic(topic, false).thenCompose(list -> {
                 int oldPartitionNumber = topicMetadata.numPartitions();
                 int currentPartitionNumber = list.size();
 

Reply via email to