This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push:
new c4bdf2ce1af [fix][broker] Topic policy reader can't recover when get
any exception (#17562)
c4bdf2ce1af is described below
commit c4bdf2ce1af1512265952643710948684b7bb9b6
Author: Qiang Zhao <[email protected]>
AuthorDate: Tue Sep 13 09:06:25 2022 +0800
[fix][broker] Topic policy reader can't recover when get any exception
(#17562)
---
.../SystemTopicBasedTopicPoliciesService.java | 28 ++++++++++++----------
1 file changed, 16 insertions(+), 12 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index 93f97bbce07..0e0e4950c36 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -29,6 +29,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import javax.annotation.Nonnull;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.namespace.NamespaceBundleOwnershipListener;
@@ -252,20 +253,19 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
return result;
}
- private void prepareInitPoliciesCache(NamespaceName namespace,
CompletableFuture<Void> result) {
+ private void prepareInitPoliciesCache(@Nonnull NamespaceName namespace,
CompletableFuture<Void> result) {
if (policyCacheInitMap.putIfAbsent(namespace, false) == null) {
CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>
readerCompletableFuture =
createSystemTopicClientWithRetry(namespace);
readerCaches.put(namespace, readerCompletableFuture);
- readerCompletableFuture.whenComplete((reader, ex) -> {
- if (ex != null) {
- log.error("[{}] Failed to create reader on __change_events
topic", namespace, ex);
- result.completeExceptionally(ex);
-
cleanCacheAndCloseReader(reader.getSystemTopic().getTopicName().getNamespaceObject(),
false);
- } else {
- initPolicesCache(reader, result);
- result.thenRun(() -> readMorePolicies(reader));
- }
+ readerCompletableFuture.thenAccept(reader -> {
+ initPolicesCache(reader, result);
+ result.thenRun(() -> readMorePolicies(reader));
+ }).exceptionally(ex -> {
+ log.error("[{}] Failed to create reader on __change_events
topic", namespace, ex);
+ cleanCacheAndCloseReader(namespace, false);
+ result.completeExceptionally(ex);
+ return null;
});
}
}
@@ -367,14 +367,18 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
});
}
- private void cleanCacheAndCloseReader(NamespaceName namespace, boolean
cleanOwnedBundlesCount) {
+ private void cleanCacheAndCloseReader(@Nonnull NamespaceName namespace,
boolean cleanOwnedBundlesCount) {
CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerFuture
= readerCaches.remove(namespace);
policiesCache.entrySet().removeIf(entry ->
Objects.equals(entry.getKey().getNamespaceObject(), namespace));
if (cleanOwnedBundlesCount) {
ownedBundlesCountPerNamespace.remove(namespace);
}
if (readerFuture != null && !readerFuture.isCompletedExceptionally()) {
- readerFuture.thenAccept(SystemTopicClient.Reader::closeAsync);
+ readerFuture.thenCompose(SystemTopicClient.Reader::closeAsync)
+ .exceptionally(ex -> {
+ log.warn("[{}] Close change_event reader fail.",
namespace, ex);
+ return null;
+ });
}
policyCacheInitMap.remove(namespace);
}