This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 29fcac429bb [fix][broker] Trigger topic creation event only once for 
non-existent topic (#24802)
29fcac429bb is described below

commit 29fcac429bb79cb6acb4d480484026d46651fa16
Author: Zixuan Liu <[email protected]>
AuthorDate: Thu Nov 6 15:48:49 2025 +0800

    [fix][broker] Trigger topic creation event only once for non-existent topic 
(#24802)
    
    (cherry picked from commit d168ab8ada8e586d4e2e2f78ff11762017ff7051)
---
 .../pulsar/broker/service/BrokerService.java       | 15 +++-
 .../pulsar/broker/TopicEventsListenerTest.java     | 86 +++++++++++++++++++---
 .../pulsar/broker/service/PersistentTopicTest.java |  3 +
 .../pulsar/broker/service/ServerCnxTest.java       |  3 +
 4 files changed, 91 insertions(+), 16 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 2c11aa0fd61..2e47852136e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1754,7 +1754,12 @@ public class BrokerService implements Closeable {
                 : CompletableFuture.completedFuture(null);
 
         maxTopicsCheck.thenCompose(__ ->
-                        
getManagedLedgerConfig(topicName)).thenAccept(managedLedgerConfig -> {
+                        getManagedLedgerConfig(topicName))
+                
.thenCombine(pulsar().getNamespaceService().checkTopicExistsAsync(topicName).thenApply(n
 -> {
+                            boolean found = n.isExists();
+                            n.recycle();
+                            return found;
+                        }), (managedLedgerConfig, exists) -> {
             if (isBrokerEntryMetadataEnabled() || 
isBrokerPayloadProcessorEnabled()) {
                 // init managedLedger interceptor
                 Set<BrokerEntryMetadataInterceptor> interceptors = new 
HashSet<>();
@@ -1789,8 +1794,10 @@ public class BrokerService implements Closeable {
             });
 
             if (createIfMissing) {
-                topicEventsDispatcher.notify(topic, TopicEvent.CREATE, 
EventStage.BEFORE);
-                topicEventsDispatcher.notifyOnCompletion(topicFuture, topic, 
TopicEvent.CREATE);
+                if (!exists) {
+                    topicEventsDispatcher.notify(topic, TopicEvent.CREATE, 
EventStage.BEFORE);
+                    topicEventsDispatcher.notifyOnCompletion(topicFuture, 
topic, TopicEvent.CREATE);
+                }
             }
             topicEventsDispatcher.notifyOnCompletion(loadFuture, topic, 
TopicEvent.LOAD);
 
@@ -1877,7 +1884,7 @@ public class BrokerService implements Closeable {
                             }
                         }
                     }, () -> isTopicNsOwnedByBrokerAsync(topicName), null);
-
+            return null;
         }).exceptionally((exception) -> {
             log.warn("[{}] Failed to get topic configuration: {}", topic, 
exception.getMessage(), exception);
             // remove topic from topics-map in different thread to avoid 
possible deadlock if
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/TopicEventsListenerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/TopicEventsListenerTest.java
index 152b4aeeeb2..123f7ca824e 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/TopicEventsListenerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/TopicEventsListenerTest.java
@@ -18,8 +18,10 @@
  */
 package org.apache.pulsar.broker;
 
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
 import com.google.common.collect.Sets;
-
 import java.util.Queue;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -44,9 +46,6 @@ import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertTrue;
-
 @Slf4j
 public class TopicEventsListenerTest extends BrokerTestBase {
 
@@ -266,14 +265,30 @@ public class TopicEventsListenerTest extends 
BrokerTestBase {
         final String[] expectedEvents;
         if (topicDomain.equalsIgnoreCase("persistent") || 
topicTypePartitioned.equals("partitioned")) {
             if (topicTypePartitioned.equals("partitioned")) {
-                expectedEvents = new String[]{
-                        "CREATE__BEFORE",
-                        "CREATE__SUCCESS",
-                        "LOAD__BEFORE",
-                        "CREATE__BEFORE",
-                        "CREATE__SUCCESS",
-                        "LOAD__SUCCESS"
-                };
+                if (topicDomain.equalsIgnoreCase("persistent")) {
+                    expectedEvents = new String[]{
+                            "CREATE__BEFORE",
+                            "CREATE__SUCCESS",
+                            "LOAD__BEFORE",
+                            "LOAD__SUCCESS"
+                    };
+                } else {
+                    // For non-persistent partitioned topic, only metadata is 
initially created;
+                    // partitions are created when the client connects.
+                    // PR #23680 currently records creation events at metadata 
creation,
+                    // and the broker records them again when partitions are 
loaded,
+                    // which can result in multiple events.
+                    // Ideally, #23680 should not record the event here,
+                    // because the topic is not fully created until the client 
connects.
+                    expectedEvents = new String[]{
+                            "CREATE__BEFORE",
+                            "CREATE__SUCCESS",
+                            "LOAD__BEFORE",
+                            "CREATE__BEFORE",
+                            "CREATE__SUCCESS",
+                            "LOAD__SUCCESS",
+                    };
+                }
             } else {
                 expectedEvents = new String[]{
                         "LOAD__BEFORE",
@@ -309,6 +324,53 @@ public class TopicEventsListenerTest extends 
BrokerTestBase {
                 Assert.assertEquals(events.toArray(), expectedEvents));
     }
 
+    @DataProvider(name = "createTopicEventType")
+    public static Object[][] createTopicEventType() {
+        return new Object[][] {
+                {"persistent", "partitioned"},
+                {"persistent", "non-partitioned"},
+                {"non-persistent", "partitioned"},
+                {"non-persistent", "non-partitioned"},
+        };
+    }
+
+    @Test(dataProvider = "createTopicEventType")
+    public void testCreateTopicEvent(String topicTypePersistence, String 
topicTypePartitioned) throws Exception {
+        String topicName = topicTypePersistence + "://" + namespace + "/" + 
"topic-" + UUID.randomUUID();
+
+        events.clear();
+        if (topicTypePartitioned.equals("partitioned")) {
+            topicNameToWatch = topicName + "-partition-0";
+            admin.topics().createPartitionedTopic(topicName, 1);
+        } else {
+            topicNameToWatch = topicName;
+            admin.topics().createNonPartitionedTopic(topicName);
+        }
+
+        triggerPartitionsCreation(topicName); // ensure partitions are really 
created
+        triggerPartitionsCreation(topicName); // trigger again to ensure no 
duplicate events
+
+        Awaitility.await().during(3, TimeUnit.SECONDS)
+                .untilAsserted(() -> {
+                    if (topicTypePartitioned.equals("partitioned") && 
topicTypePersistence.equals("non-persistent")) {
+                        // For non-persistent partitioned topic, only metadata 
is initially created;
+                        // partitions are created when the client connects.
+                        // PR #23680 currently records creation events at 
metadata creation,
+                        // and the broker records them again when partitions 
are loaded,
+                        // which can result in multiple events.
+                        // Ideally, #23680 should not record the event here,
+                        // because the topic is not fully created until the 
client connects.
+                        assertThat(events.toArray())
+                                .contains("CREATE__BEFORE")
+                                .contains("CREATE__SUCCESS");
+                    } else {
+                        assertThat(events.toArray())
+                                .containsOnlyOnce("CREATE__BEFORE")
+                                .containsOnlyOnce("CREATE__SUCCESS");
+                    }
+                });
+    }
+
     private PulsarAdmin createPulsarAdmin() throws PulsarClientException {
         return PulsarAdmin.builder()
                 .serviceHttpUrl(brokerUrl != null ? brokerUrl.toString() : 
brokerUrlTls.toString())
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index 1938567387a..853ede391e2 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -95,6 +95,7 @@ import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.namespace.NamespaceService;
+import org.apache.pulsar.broker.namespace.TopicExistsInfo;
 import org.apache.pulsar.broker.service.persistent.CompactorSubscription;
 import org.apache.pulsar.broker.service.persistent.GeoPersistentReplicator;
 import 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
@@ -218,6 +219,8 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         
doReturn(CompletableFuture.completedFuture(true)).when(nsSvc).isServiceUnitActiveAsync(any());
         
doReturn(CompletableFuture.completedFuture(mock(NamespaceBundle.class))).when(nsSvc).getBundleAsync(any());
         
doReturn(CompletableFuture.completedFuture(true)).when(nsSvc).checkBundleOwnership(any(),
 any());
+        
doReturn(CompletableFuture.completedFuture(TopicExistsInfo.newTopicNotExists())).when(nsSvc)
+                .checkTopicExistsAsync(any());
 
         setupMLAsyncCallbackMocks();
     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index 760a9d74df2..b4f0a1d4935 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -103,6 +103,7 @@ import 
org.apache.pulsar.broker.authentication.AuthenticationState;
 import org.apache.pulsar.broker.authorization.AuthorizationService;
 import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
 import org.apache.pulsar.broker.namespace.NamespaceService;
+import org.apache.pulsar.broker.namespace.TopicExistsInfo;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
 import org.apache.pulsar.broker.service.ServerCnx.State;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
@@ -236,6 +237,8 @@ public class ServerCnxTest {
                 NamespaceName.get("use", "ns-abc"), 
CommandGetTopicsOfNamespace.Mode.ALL);
         
doReturn(CompletableFuture.completedFuture(topics)).when(namespaceService).getListOfPersistentTopics(
                 NamespaceName.get("use", "ns-abc"));
+        
doReturn(CompletableFuture.completedFuture(TopicExistsInfo.newTopicNotExists())).when(namespaceService)
+                .checkTopicExistsAsync(any());
 
         setupMLAsyncCallbackMocks();
 

Reply via email to