This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 746c3efd536 [fix][client][branch-4.0] Partitioned topics are unexpectedly created by client after deletion (#24554) (#24571) 746c3efd536 is described below commit 746c3efd5361352e50efc15e02523c4f477e8681 Author: Yunze Xu <xyzinfern...@163.com> AuthorDate: Tue Jul 29 22:00:35 2025 +0800 [fix][client][branch-4.0] Partitioned topics are unexpectedly created by client after deletion (#24554) (#24571) (cherry picked from commit 16271dc888c20d3e2233f1717b37f6f13fcb8af3) --- .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 15 +++++- .../pulsar/broker/admin/TopicAutoCreationTest.java | 63 +++++++++++++++++++--- .../client/impl/MultiTopicsConsumerImpl.java | 2 +- .../client/impl/PartitionedProducerImpl.java | 2 +- 4 files changed, 72 insertions(+), 10 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 869f60cad31..86d34dae850 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -3064,7 +3064,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { // Truncate to ensure the offloaded data is not orphaned. // Also ensures the BK ledgers are deleted and not just scheduled for deletion - CompletableFuture<Void> truncateFuture = asyncTruncate(); + CompletableFuture<Void> truncateFuture = asyncTruncate(true); truncateFuture.whenComplete((ignore, exc) -> { if (exc != null) { log.error("[{}] Error truncating ledger for deletion", name, exc); @@ -4472,6 +4472,12 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { @Override public CompletableFuture<Void> asyncTruncate() { + return asyncTruncate(false); + } + + // When asyncTruncate is called by asyncDelete, the argument should be true because cursors will not be accessed + // after the managed ledger is deleted. + private CompletableFuture<Void> asyncTruncate(boolean ignoreCursorFailure) { final List<CompletableFuture<Void>> futures = new ArrayList(); for (ManagedCursor cursor : cursors) { @@ -4484,7 +4490,12 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { @Override public void clearBacklogFailed(ManagedLedgerException exception, Object ctx) { - future.completeExceptionally(exception); + if (ignoreCursorFailure) { + log.warn("Failed to clear backlog for cursor {}", cursor.getName(), exception); + future.complete(null); + } else { + future.completeExceptionally(exception); + } } }, null); futures.add(future); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java index 45c7dbea2ab..2c8aae6fbe5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java @@ -24,7 +24,9 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; +import java.io.Closeable; import java.net.InetSocketAddress; +import java.time.Duration; import java.util.List; import java.util.UUID; import java.util.concurrent.CompletableFuture; @@ -37,17 +39,20 @@ import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerConsumerBase; +import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.impl.LookupService; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride; import org.apache.pulsar.common.policies.data.TopicType; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeMethod; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @Test(groups = "broker-admin") @@ -55,7 +60,7 @@ import org.testng.annotations.Test; public class TopicAutoCreationTest extends ProducerConsumerBase { @Override - @BeforeMethod + @BeforeClass protected void setup() throws Exception { conf.setAllowAutoTopicCreationType(TopicType.PARTITIONED); conf.setAllowAutoTopicCreation(true); @@ -71,7 +76,7 @@ public class TopicAutoCreationTest extends ProducerConsumerBase { } @Override - @AfterMethod(alwaysRun = true) + @AfterClass(alwaysRun = true) protected void cleanup() throws Exception { super.internalCleanup(); } @@ -87,9 +92,11 @@ public class TopicAutoCreationTest extends ProducerConsumerBase { .create(); List<String> partitionedTopics = admin.topics().getPartitionedTopicList(namespaceName); + assertTrue(partitionedTopics.contains(topic)); List<String> topics = admin.topics().getList(namespaceName); - assertEquals(partitionedTopics.size(), 1); - assertEquals(topics.size(), 3); + for (int i = 0; i < conf.getDefaultNumPartitions(); i++) { + assertTrue(topics.contains(topic + TopicName.PARTITIONED_TOPIC_SUFFIX + i)); + } producer.close(); for (String t : topics) { @@ -248,4 +255,48 @@ public class TopicAutoCreationTest extends ProducerConsumerBase { admin.namespaces().deleteNamespace(namespace, true); } + @Test + public void testPartitionsNotCreatedAfterDeletion() throws Exception { + @Cleanup final var client = PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl()).build(); + final var topicName = TopicName.get("my-property/my-ns/testPartitionsNotCreatedAfterDeletion"); + final var topic = topicName.toString(); + final var interval = Duration.ofSeconds(1); + final ThrowableConsumer<ThrowableSupplier<Closeable>> verifier = creator -> { + admin.topics().createPartitionedTopic(topic, 1); + boolean needCleanup = false; + try (final var ignored = creator.get()) { + admin.topics().terminatePartitionedTopic(topic); + admin.topics().deletePartitionedTopic(topic, true); + Thread.sleep(interval.toMillis() + 500); // wait until the auto update partitions task has run + + final var topics = admin.topics().getList(topicName.getNamespace()).stream() + .filter(__ -> __.contains(topicName.getLocalName())).toList(); + // Without https://github.com/apache/pulsar/pull/24118, the producer or consumer on partition 0 could be + // automatically created. + if (!topics.isEmpty()) { + assertEquals(topics, List.of(topicName.getPartition(0).toString())); + needCleanup = true; + } + } + if (needCleanup) { + admin.topics().delete(topicName.getPartition(0).toString()); + } + }; + verifier.accept(() -> client.newProducer().topic(topic) + .autoUpdatePartitionsInterval(interval.toSecondsPart(), TimeUnit.SECONDS).create()); + verifier.accept(() -> client.newConsumer().topic(topic).subscriptionName("sub") + .autoUpdatePartitionsInterval(interval.toSecondsPart(), TimeUnit.SECONDS).subscribe()); + verifier.accept(() -> client.newReader().topic(topic).startMessageId(MessageId.earliest) + .autoUpdatePartitionsInterval(interval.toSecondsPart(), TimeUnit.SECONDS).create()); + } + + private interface ThrowableConsumer<T> { + + void accept(T value) throws Exception; + } + + public interface ThrowableSupplier<T> { + + T get() throws Exception; + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index 451ddaf76db..359eb1d2c96 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -1395,7 +1395,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { private CompletableFuture<Void> subscribeIncreasedTopicPartitions(String topicName) { int oldPartitionNumber = partitionedTopics.get(topicName); - return client.getPartitionsForTopic(topicName).thenCompose(list -> { + return client.getPartitionsForTopic(topicName, false).thenCompose(list -> { int currentPartitionNumber = Long.valueOf(list.stream() .filter(t -> TopicName.get(t).isPartitioned()).count()).intValue(); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java index 77ed7d3454b..cee86de8d59 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java @@ -391,7 +391,7 @@ public class PartitionedProducerImpl<T> extends ProducerBase<T> { return future; } - client.getPartitionsForTopic(topic).thenCompose(list -> { + client.getPartitionsForTopic(topic, false).thenCompose(list -> { int oldPartitionNumber = topicMetadata.numPartitions(); int currentPartitionNumber = list.size();