BewareMyPower commented on code in PR #24658:
URL: https://github.com/apache/pulsar/pull/24658#discussion_r2297135811
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java:
##########
@@ -473,15 +478,16 @@ public boolean test(NamespaceBundle namespaceBundle) {
private void initPolicesCache(SystemTopicClient.Reader<PulsarEvent>
reader, CompletableFuture<Void> future) {
if (closed.get()) {
future.completeExceptionally(new
BrokerServiceException(getClass().getName() + " is closed."));
-
cleanCacheAndCloseReader(reader.getSystemTopic().getTopicName().getNamespaceObject(),
false);
+
cleanCache(reader.getSystemTopic().getTopicName().getNamespaceObject(), false,
true);
return;
}
reader.hasMoreEventsAsync().whenComplete((hasMore, ex) -> {
if (ex != null) {
log.error("[{}] Failed to check the move events for the system
topic",
reader.getSystemTopic().getTopicName(), ex);
future.completeExceptionally(ex);
-
cleanCacheAndCloseReader(reader.getSystemTopic().getTopicName().getNamespaceObject(),
false);
+
cleanCache(reader.getSystemTopic().getTopicName().getNamespaceObject(), false,
+ isAlreadyClosedException(ex));
Review Comment:
I've drafted a change, could you take a look?
```diff
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index e8f6f5c0c4..231c9ed93e 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -403,8 +403,11 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
if (closed.get()) {
return null;
}
- cleanCache(namespace, false,
-
readerCompletableFuture.isCompletedExceptionally());
+ if
(readerCompletableFuture.isCompletedExceptionally()) {
+ cleanReaderCache(namespace);
+ } else {
+
policyCacheInitMap.remove(namespace);
+ }
} catch (Throwable cleanupEx) {
// Adding this catch to avoid break
callback chain
log.error("[{}] Failed to cleanup
reader on __change_events topic",
@@ -458,7 +461,9 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
}
AtomicInteger bundlesCount =
ownedBundlesCountPerNamespace.get(namespace);
if (bundlesCount == null || bundlesCount.decrementAndGet() <= 0) {
- cleanCache(namespace, true, true, true);
+ writerCaches.synchronous().invalidate(namespace);
+ ownedBundlesCountPerNamespace.remove(namespace);
+ cleanReaderCache(namespace);
}
}
@@ -488,7 +493,7 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
private void initPolicesCache(SystemTopicClient.Reader<PulsarEvent>
reader, CompletableFuture<Void> future) {
if (closed.get()) {
future.completeExceptionally(new
BrokerServiceException(getClass().getName() + " is closed."));
-
cleanCache(reader.getSystemTopic().getTopicName().getNamespaceObject(), false,
true);
+
cleanReaderCache(reader.getSystemTopic().getTopicName().getNamespaceObject());
return;
}
reader.hasMoreEventsAsync().whenComplete((hasMore, ex) -> {
@@ -496,8 +501,12 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
log.error("[{}] Failed to check the move events for the
system topic",
reader.getSystemTopic().getTopicName(), ex);
future.completeExceptionally(ex);
-
cleanCache(reader.getSystemTopic().getTopicName().getNamespaceObject(), false,
- isAlreadyClosedException(ex));
+ final var namespace =
reader.getSystemTopic().getTopicName().getNamespaceObject();
+ if (isAlreadyClosedException(ex)) {
+ cleanReaderCache(namespace);
+ } else {
+ policyCacheInitMap.remove(namespace);
+ }
return;
}
if (hasMore) {
@@ -516,8 +525,9 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
log.error("[{}] Failed to read event from the system
topic.",
reader.getSystemTopic().getTopicName(), e);
future.completeExceptionally(e);
-
cleanCache(reader.getSystemTopic().getTopicName().getNamespaceObject(), false,
- isAlreadyClosedException(e));
+ if (isAlreadyClosedException(e)) {
+
cleanReaderCache(reader.getSystemTopic().getTopicName().getNamespaceObject());
+ }
return null;
});
} else {
@@ -543,26 +553,7 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
});
}
- private void cleanCache(@NonNull NamespaceName namespace, boolean
cleanOwnedBundlesCount,
- boolean closeReaderCache) {
- cleanCache(namespace, cleanOwnedBundlesCount, false,
closeReaderCache);
- }
-
- private void cleanCache(@NonNull NamespaceName namespace, boolean
cleanOwnedBundlesCount,
- boolean cleanWriterCache, boolean
cleanReaderCache) {
- if (cleanWriterCache) {
- writerCaches.synchronous().invalidate(namespace);
- }
-
- if (cleanOwnedBundlesCount) {
- ownedBundlesCountPerNamespace.remove(namespace);
- }
-
- if (!cleanReaderCache) {
- policyCacheInitMap.remove(namespace);
- return;
- }
-
+ private void cleanReaderCache(@NonNull NamespaceName namespace) {
CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>
readerFuture = readerCaches.remove(namespace);
policyCacheInitMap.compute(namespace, (k, v) -> {
policiesCache.entrySet().removeIf(entry ->
Objects.equals(entry.getKey().getNamespaceObject(), namespace));
@@ -586,7 +577,7 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
*/
private void
readMorePoliciesAsync(SystemTopicClient.Reader<PulsarEvent> reader) {
if (closed.get()) {
-
cleanCache(reader.getSystemTopic().getTopicName().getNamespaceObject(), false,
true);
+
cleanReaderCache(reader.getSystemTopic().getTopicName().getNamespaceObject());
return;
}
reader.readNextAsync()
@@ -605,8 +596,7 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
if (isAlreadyClosedException(ex)) {
log.info("Closing the topic policies reader for
{}",
reader.getSystemTopic().getTopicName());
- cleanCache(
-
reader.getSystemTopic().getTopicName().getNamespaceObject(), false, true);
+
cleanReaderCache(reader.getSystemTopic().getTopicName().getNamespaceObject());
} else {
log.warn("Read more topic polices exception,
read again.", ex);
readMorePoliciesAsync(reader);
```
As you can see, there is actually three cases:
1. Clear the reader cache
2. Clear the writer cache, reader cache and owner bundle cache
3. Clear the reader cache if the exception is "already closed", otherwise,
just clear `policyCacheInitMap`
With the new `cleanCache` method you've added in this PR, there could be
technically 8 possible combinations, which makes code even harder to read than
existing code.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]