This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 7b6f9fceeb5 [fix][broker] Allow intermittent error from topic policies 
service when loading topics (#24829)
7b6f9fceeb5 is described below

commit 7b6f9fceeb56a1e85bdd917e97393ab8cb19544b
Author: Yunze Xu <[email protected]>
AuthorDate: Wed Oct 8 23:42:11 2025 +0800

    [fix][broker] Allow intermittent error from topic policies service when 
loading topics (#24829)
---
 .../pulsar/broker/service/BrokerService.java       | 18 ++++++++-------
 .../pulsar/broker/service/BrokerServiceTest.java   | 27 +++++++++++++++++++++-
 2 files changed, 36 insertions(+), 9 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index bd38283ae41..5c19de44341 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1168,14 +1168,7 @@ public class BrokerService implements Closeable {
                     // The topic level policies are not needed now, but the 
meaning of calling
                     // "getTopicPoliciesBypassSystemTopic" will wait for 
system topic policies initialization.
                     getTopicPoliciesBypassSystemTopic(topicName, 
TopicPoliciesService.GetType.LOCAL_ONLY)
-                            .exceptionally(ex -> {
-                        final Throwable rc = 
FutureUtil.unwrapCompletionException(ex);
-                        final String errorInfo = String.format("Topic creation 
encountered an exception by initialize"
-                                + " topic policies service. topic_name=%s 
error_message=%s", topicName,
-                                rc.getMessage());
-                        log.error(errorInfo, rc);
-                        throw FutureUtil.wrapToCompletionException(new 
ServiceUnitNotReadyException(errorInfo));
-                    }).thenRun(() -> {
+                            .thenRun(() -> {
                         final var inserted = new MutableBoolean(false);
                         final var cachedFuture = 
topics.computeIfAbsent(topicName.toString(), ___ -> {
                             inserted.setTrue();
@@ -1195,6 +1188,15 @@ public class BrokerService implements Closeable {
                                 }
                             });
                         }
+                    }).exceptionally(e -> {
+                        pulsar.getExecutor().execute(() -> 
topics.remove(topicName.toString(), topicFuture));
+                        final Throwable rc = 
FutureUtil.unwrapCompletionException(e);
+                        final String errorInfo = String.format("Topic creation 
encountered an exception by initialize"
+                                        + " topic policies service. 
topic_name=%s error_message=%s", topicName,
+                                rc.getMessage());
+                        log.error(errorInfo, rc);
+                        topicFuture.completeExceptionally(rc);
+                        return null;
                     });
                 });
                 return topicFuture;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
index 62dcc37f38e..56a08eac209 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
@@ -125,6 +125,7 @@ import 
org.apache.pulsar.common.policies.data.OffloadPolicies;
 import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
 import org.apache.pulsar.common.policies.data.OffloadedReadPriority;
 import org.apache.pulsar.common.policies.data.SubscriptionStats;
+import org.apache.pulsar.common.policies.data.TopicPolicies;
 import org.apache.pulsar.common.policies.data.TopicStats;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.util.netty.EventLoopUtil;
@@ -148,7 +149,8 @@ public class BrokerServiceTest extends BrokerTestBase {
     @Override
     protected void setup() throws Exception {
         conf.setSystemTopicEnabled(false);
-        conf.setTopicLevelPoliciesEnabled(false);
+        conf.setTopicLevelPoliciesEnabled(true);
+        
conf.setTopicPoliciesServiceClassName(MockTopicPoliciesService.class.getName());
         super.baseSetup();
     }
 
@@ -2036,5 +2038,28 @@ public class BrokerServiceTest extends BrokerTestBase {
         retryStrategically((test) -> sync.getProducer() != null, 1000, 10);
         assertNotNull(sync.getProducer());
     }
+
+    @Test
+    public void testGetTopicWhenTopicPoliciesFail() throws Exception {
+        final var topicName = 
TopicName.get("prop/ns-abc/test-get-topic-when-topic-policies-fail");
+        MockTopicPoliciesService.FAILED_TOPICS.add(topicName);
+        @Cleanup final var producer = 
pulsarClient.newProducer().topic(topicName.toString()).create();
+        
assertFalse(MockTopicPoliciesService.FAILED_TOPICS.contains(topicName));
+    }
+
+    static class MockTopicPoliciesService extends 
TopicPoliciesService.TopicPoliciesServiceDisabled {
+
+        static final Set<TopicName> FAILED_TOPICS = 
ConcurrentHashMap.newKeySet();
+
+        @Override
+        public CompletableFuture<Optional<TopicPolicies>> 
getTopicPoliciesAsync(TopicName topicName, GetType type) {
+            if (FAILED_TOPICS.contains(topicName)) {
+                // Only fail once
+                FAILED_TOPICS.remove(topicName);
+                return CompletableFuture.failedFuture(new 
RuntimeException("injected failure for " + topicName));
+            }
+            return CompletableFuture.completedFuture(Optional.empty());
+        }
+    }
 }
 

Reply via email to