This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new d168ab8ada8 [fix][broker] Trigger topic creation event only once for
non-existent topic (#24802)
d168ab8ada8 is described below
commit d168ab8ada8e586d4e2e2f78ff11762017ff7051
Author: Zixuan Liu <[email protected]>
AuthorDate: Thu Nov 6 15:48:49 2025 +0800
[fix][broker] Trigger topic creation event only once for non-existent topic
(#24802)
---
.../pulsar/broker/service/BrokerService.java | 14 ++--
.../pulsar/broker/TopicEventsListenerTest.java | 80 +++++++++++++++++++---
.../pulsar/broker/service/PersistentTopicTest.java | 3 +
.../pulsar/broker/service/ServerCnxTest.java | 3 +
4 files changed, 88 insertions(+), 12 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index db58e2e6525..3323b9db713 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1824,7 +1824,11 @@ public class BrokerService implements Closeable {
maxTopicsCheck.thenCompose(partitionedTopicMetadata ->
validateTopicConsistency(topicName))
.thenCompose(__ -> isTopicAlreadyMigrated)
.thenCompose(__ -> getManagedLedgerConfig(topicName))
- .thenAccept(managedLedgerConfig -> {
+
.thenCombine(pulsar().getNamespaceService().checkTopicExistsAsync(topicName).thenApply(n
-> {
+ boolean found = n.isExists();
+ n.recycle();
+ return found;
+ }), (managedLedgerConfig, exists) -> {
if (isBrokerEntryMetadataEnabled() ||
isBrokerPayloadProcessorEnabled()) {
// init managedLedger interceptor
Set<BrokerEntryMetadataInterceptor> interceptors = new
HashSet<>();
@@ -1861,8 +1865,10 @@ public class BrokerService implements Closeable {
});
if (createIfMissing) {
- topicEventsDispatcher.notify(topic, TopicEvent.CREATE,
EventStage.BEFORE);
- topicEventsDispatcher.notifyOnCompletion(topicFuture, topic,
TopicEvent.CREATE);
+ if (!exists) {
+ topicEventsDispatcher.notify(topic, TopicEvent.CREATE,
EventStage.BEFORE);
+ topicEventsDispatcher.notifyOnCompletion(topicFuture,
topic, TopicEvent.CREATE);
+ }
}
topicEventsDispatcher.notifyOnCompletion(loadFuture, topic,
TopicEvent.LOAD);
@@ -1952,7 +1958,7 @@ public class BrokerService implements Closeable {
}
}
}, () -> isTopicNsOwnedByBrokerAsync(topicName), null);
-
+ return null;
}).exceptionally((exception) -> {
boolean migrationFailure = exception.getCause() instanceof
TopicMigratedException;
String msg = migrationFailure ? "Topic is already migrated" :
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/TopicEventsListenerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/TopicEventsListenerTest.java
index b597d98efc3..fb45094c569 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/TopicEventsListenerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/TopicEventsListenerTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import com.google.common.collect.Sets;
@@ -265,14 +266,30 @@ public class TopicEventsListenerTest extends
BrokerTestBase {
final String[] expectedEvents;
if (topicDomain.equalsIgnoreCase("persistent") ||
topicTypePartitioned.equals("partitioned")) {
if (topicTypePartitioned.equals("partitioned")) {
- expectedEvents = new String[]{
- "CREATE__BEFORE",
- "CREATE__SUCCESS",
- "LOAD__BEFORE",
- "CREATE__BEFORE",
- "CREATE__SUCCESS",
- "LOAD__SUCCESS"
- };
+ if (topicDomain.equalsIgnoreCase("persistent")) {
+ expectedEvents = new String[]{
+ "CREATE__BEFORE",
+ "CREATE__SUCCESS",
+ "LOAD__BEFORE",
+ "LOAD__SUCCESS"
+ };
+ } else {
+ // For non-persistent partitioned topic, only metadata is
initially created;
+ // partitions are created when the client connects.
+ // PR #23680 currently records creation events at metadata
creation,
+ // and the broker records them again when partitions are
loaded,
+ // which can result in multiple events.
+ // Ideally, #23680 should not record the event here,
+ // because the topic is not fully created until the client
connects.
+ expectedEvents = new String[]{
+ "CREATE__BEFORE",
+ "CREATE__SUCCESS",
+ "LOAD__BEFORE",
+ "CREATE__BEFORE",
+ "CREATE__SUCCESS",
+ "LOAD__SUCCESS",
+ };
+ }
} else {
expectedEvents = new String[]{
"LOAD__BEFORE",
@@ -308,6 +325,53 @@ public class TopicEventsListenerTest extends
BrokerTestBase {
Assert.assertEquals(events.toArray(), expectedEvents));
}
+ @DataProvider(name = "createTopicEventType")
+ public static Object[][] createTopicEventType() {
+ return new Object[][] {
+ {"persistent", "partitioned"},
+ {"persistent", "non-partitioned"},
+ {"non-persistent", "partitioned"},
+ {"non-persistent", "non-partitioned"},
+ };
+ }
+
+ @Test(dataProvider = "createTopicEventType")
+ public void testCreateTopicEvent(String topicTypePersistence, String
topicTypePartitioned) throws Exception {
+ String topicName = topicTypePersistence + "://" + namespace + "/" +
"topic-" + UUID.randomUUID();
+
+ events.clear();
+ if (topicTypePartitioned.equals("partitioned")) {
+ topicNameToWatch = topicName + "-partition-0";
+ admin.topics().createPartitionedTopic(topicName, 1);
+ } else {
+ topicNameToWatch = topicName;
+ admin.topics().createNonPartitionedTopic(topicName);
+ }
+
+ triggerPartitionsCreation(topicName); // ensure partitions are really
created
+ triggerPartitionsCreation(topicName); // trigger again to ensure no
duplicate events
+
+ Awaitility.await().during(3, TimeUnit.SECONDS)
+ .untilAsserted(() -> {
+ if (topicTypePartitioned.equals("partitioned") &&
topicTypePersistence.equals("non-persistent")) {
+ // For non-persistent partitioned topic, only metadata
is initially created;
+ // partitions are created when the client connects.
+ // PR #23680 currently records creation events at
metadata creation,
+ // and the broker records them again when partitions
are loaded,
+ // which can result in multiple events.
+ // Ideally, #23680 should not record the event here,
+ // because the topic is not fully created until the
client connects.
+ assertThat(events.toArray())
+ .contains("CREATE__BEFORE")
+ .contains("CREATE__SUCCESS");
+ } else {
+ assertThat(events.toArray())
+ .containsOnlyOnce("CREATE__BEFORE")
+ .containsOnlyOnce("CREATE__SUCCESS");
+ }
+ });
+ }
+
private PulsarAdmin createPulsarAdmin() throws PulsarClientException {
return PulsarAdmin.builder()
.serviceHttpUrl(brokerUrl != null ? brokerUrl.toString() :
brokerUrlTls.toString())
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index d4306a11859..37e1e058cc2 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -101,6 +101,7 @@ import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.namespace.NamespaceService;
+import org.apache.pulsar.broker.namespace.TopicExistsInfo;
import
org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.GeoPersistentReplicator;
import
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
@@ -226,6 +227,8 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
doReturn(true).when(nsSvc).isServiceUnitOwned(any());
doReturn(CompletableFuture.completedFuture(mock(NamespaceBundle.class))).when(nsSvc).getBundleAsync(any());
doReturn(CompletableFuture.completedFuture(true)).when(nsSvc).checkBundleOwnership(any(),
any());
+
doReturn(CompletableFuture.completedFuture(TopicExistsInfo.newTopicNotExists())).when(nsSvc)
+ .checkTopicExistsAsync(any());
setupMLAsyncCallbackMocks();
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index 2cfbac35bfc..a19bc017574 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -105,6 +105,7 @@ import
org.apache.pulsar.broker.authentication.AuthenticationState;
import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
import org.apache.pulsar.broker.namespace.NamespaceService;
+import org.apache.pulsar.broker.namespace.TopicExistsInfo;
import
org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
import org.apache.pulsar.broker.service.ServerCnx.State;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
@@ -237,6 +238,8 @@ public class ServerCnxTest {
NamespaceName.get("use", "ns-abc"),
CommandGetTopicsOfNamespace.Mode.ALL);
doReturn(CompletableFuture.completedFuture(topics)).when(namespaceService).getListOfPersistentTopics(
NamespaceName.get("use", "ns-abc"));
+
doReturn(CompletableFuture.completedFuture(TopicExistsInfo.newTopicNotExists())).when(namespaceService)
+ .checkTopicExistsAsync(any());
setupMLAsyncCallbackMocks();