This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 7b6f9fceeb5 [fix][broker] Allow intermittent error from topic policies
service when loading topics (#24829)
7b6f9fceeb5 is described below
commit 7b6f9fceeb56a1e85bdd917e97393ab8cb19544b
Author: Yunze Xu <[email protected]>
AuthorDate: Wed Oct 8 23:42:11 2025 +0800
[fix][broker] Allow intermittent error from topic policies service when
loading topics (#24829)
---
.../pulsar/broker/service/BrokerService.java | 18 ++++++++-------
.../pulsar/broker/service/BrokerServiceTest.java | 27 +++++++++++++++++++++-
2 files changed, 36 insertions(+), 9 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index bd38283ae41..5c19de44341 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1168,14 +1168,7 @@ public class BrokerService implements Closeable {
// The topic level policies are not needed now, but the
meaning of calling
// "getTopicPoliciesBypassSystemTopic" will wait for
system topic policies initialization.
getTopicPoliciesBypassSystemTopic(topicName,
TopicPoliciesService.GetType.LOCAL_ONLY)
- .exceptionally(ex -> {
- final Throwable rc =
FutureUtil.unwrapCompletionException(ex);
- final String errorInfo = String.format("Topic creation
encountered an exception by initialize"
- + " topic policies service. topic_name=%s
error_message=%s", topicName,
- rc.getMessage());
- log.error(errorInfo, rc);
- throw FutureUtil.wrapToCompletionException(new
ServiceUnitNotReadyException(errorInfo));
- }).thenRun(() -> {
+ .thenRun(() -> {
final var inserted = new MutableBoolean(false);
final var cachedFuture =
topics.computeIfAbsent(topicName.toString(), ___ -> {
inserted.setTrue();
@@ -1195,6 +1188,15 @@ public class BrokerService implements Closeable {
}
});
}
+ }).exceptionally(e -> {
+ pulsar.getExecutor().execute(() ->
topics.remove(topicName.toString(), topicFuture));
+ final Throwable rc =
FutureUtil.unwrapCompletionException(e);
+ final String errorInfo = String.format("Topic creation
encountered an exception by initialize"
+ + " topic policies service.
topic_name=%s error_message=%s", topicName,
+ rc.getMessage());
+ log.error(errorInfo, rc);
+ topicFuture.completeExceptionally(rc);
+ return null;
});
});
return topicFuture;
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
index 62dcc37f38e..56a08eac209 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
@@ -125,6 +125,7 @@ import
org.apache.pulsar.common.policies.data.OffloadPolicies;
import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
import org.apache.pulsar.common.policies.data.OffloadedReadPriority;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
+import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
@@ -148,7 +149,8 @@ public class BrokerServiceTest extends BrokerTestBase {
@Override
protected void setup() throws Exception {
conf.setSystemTopicEnabled(false);
- conf.setTopicLevelPoliciesEnabled(false);
+ conf.setTopicLevelPoliciesEnabled(true);
+
conf.setTopicPoliciesServiceClassName(MockTopicPoliciesService.class.getName());
super.baseSetup();
}
@@ -2036,5 +2038,28 @@ public class BrokerServiceTest extends BrokerTestBase {
retryStrategically((test) -> sync.getProducer() != null, 1000, 10);
assertNotNull(sync.getProducer());
}
+
+ @Test
+ public void testGetTopicWhenTopicPoliciesFail() throws Exception {
+ final var topicName =
TopicName.get("prop/ns-abc/test-get-topic-when-topic-policies-fail");
+ MockTopicPoliciesService.FAILED_TOPICS.add(topicName);
+ @Cleanup final var producer =
pulsarClient.newProducer().topic(topicName.toString()).create();
+
assertFalse(MockTopicPoliciesService.FAILED_TOPICS.contains(topicName));
+ }
+
+ static class MockTopicPoliciesService extends
TopicPoliciesService.TopicPoliciesServiceDisabled {
+
+ static final Set<TopicName> FAILED_TOPICS =
ConcurrentHashMap.newKeySet();
+
+ @Override
+ public CompletableFuture<Optional<TopicPolicies>>
getTopicPoliciesAsync(TopicName topicName, GetType type) {
+ if (FAILED_TOPICS.contains(topicName)) {
+ // Only fail once
+ FAILED_TOPICS.remove(topicName);
+ return CompletableFuture.failedFuture(new
RuntimeException("injected failure for " + topicName));
+ }
+ return CompletableFuture.completedFuture(Optional.empty());
+ }
+ }
}