BewareMyPower commented on code in PR #24658:
URL: https://github.com/apache/pulsar/pull/24658#discussion_r2297135811


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java:
##########
@@ -473,15 +478,16 @@ public boolean test(NamespaceBundle namespaceBundle) {
     private void initPolicesCache(SystemTopicClient.Reader<PulsarEvent> 
reader, CompletableFuture<Void> future) {
         if (closed.get()) {
             future.completeExceptionally(new 
BrokerServiceException(getClass().getName() + " is closed."));
-            
cleanCacheAndCloseReader(reader.getSystemTopic().getTopicName().getNamespaceObject(),
 false);
+            
cleanCache(reader.getSystemTopic().getTopicName().getNamespaceObject(), false, 
true);
             return;
         }
         reader.hasMoreEventsAsync().whenComplete((hasMore, ex) -> {
             if (ex != null) {
                 log.error("[{}] Failed to check the move events for the system 
topic",
                         reader.getSystemTopic().getTopicName(), ex);
                 future.completeExceptionally(ex);
-                
cleanCacheAndCloseReader(reader.getSystemTopic().getTopicName().getNamespaceObject(),
 false);
+                
cleanCache(reader.getSystemTopic().getTopicName().getNamespaceObject(), false,
+                        isAlreadyClosedException(ex));

Review Comment:
   I've drafted a change, could you take a look?
   
   ```diff
   diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
   index e8f6f5c0c4..231c9ed93e 100644
   --- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
   +++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
   @@ -403,8 +403,11 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
                                            if (closed.get()) {
                                                return null;
                                            }
   -                                        cleanCache(namespace, false,
   -                                                
readerCompletableFuture.isCompletedExceptionally());
   +                                        if 
(readerCompletableFuture.isCompletedExceptionally()) {
   +                                            cleanReaderCache(namespace);
   +                                        } else {
   +                                            
policyCacheInitMap.remove(namespace);
   +                                        }
                                        } catch (Throwable cleanupEx) {
                                            // Adding this catch to avoid break 
callback chain
                                            log.error("[{}] Failed to cleanup 
reader on __change_events topic",
   @@ -458,7 +461,9 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
            }
            AtomicInteger bundlesCount = 
ownedBundlesCountPerNamespace.get(namespace);
            if (bundlesCount == null || bundlesCount.decrementAndGet() <= 0) {
   -            cleanCache(namespace, true, true, true);
   +            writerCaches.synchronous().invalidate(namespace);
   +            ownedBundlesCountPerNamespace.remove(namespace);
   +            cleanReaderCache(namespace);
            }
        }
   
   @@ -488,7 +493,7 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
        private void initPolicesCache(SystemTopicClient.Reader<PulsarEvent> 
reader, CompletableFuture<Void> future) {
            if (closed.get()) {
                future.completeExceptionally(new 
BrokerServiceException(getClass().getName() + " is closed."));
   -            
cleanCache(reader.getSystemTopic().getTopicName().getNamespaceObject(), false, 
true);
   +            
cleanReaderCache(reader.getSystemTopic().getTopicName().getNamespaceObject());
                return;
            }
            reader.hasMoreEventsAsync().whenComplete((hasMore, ex) -> {
   @@ -496,8 +501,12 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
                    log.error("[{}] Failed to check the move events for the 
system topic",
                            reader.getSystemTopic().getTopicName(), ex);
                    future.completeExceptionally(ex);
   -                
cleanCache(reader.getSystemTopic().getTopicName().getNamespaceObject(), false,
   -                        isAlreadyClosedException(ex));
   +                final var namespace = 
reader.getSystemTopic().getTopicName().getNamespaceObject();
   +                if (isAlreadyClosedException(ex)) {
   +                    cleanReaderCache(namespace);
   +                } else {
   +                    policyCacheInitMap.remove(namespace);
   +                }
                    return;
                }
                if (hasMore) {
   @@ -516,8 +525,9 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
                        log.error("[{}] Failed to read event from the system 
topic.",
                                reader.getSystemTopic().getTopicName(), e);
                        future.completeExceptionally(e);
   -                    
cleanCache(reader.getSystemTopic().getTopicName().getNamespaceObject(), false,
   -                            isAlreadyClosedException(e));
   +                    if (isAlreadyClosedException(e)) {
   +                        
cleanReaderCache(reader.getSystemTopic().getTopicName().getNamespaceObject());
   +                    }
                        return null;
                    });
                } else {
   @@ -543,26 +553,7 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
            });
        }
   
   -    private void cleanCache(@NonNull NamespaceName namespace, boolean 
cleanOwnedBundlesCount,
   -                            boolean closeReaderCache) {
   -        cleanCache(namespace, cleanOwnedBundlesCount, false, 
closeReaderCache);
   -    }
   -
   -    private void cleanCache(@NonNull NamespaceName namespace, boolean 
cleanOwnedBundlesCount,
   -                            boolean cleanWriterCache, boolean 
cleanReaderCache) {
   -        if (cleanWriterCache) {
   -            writerCaches.synchronous().invalidate(namespace);
   -        }
   -
   -        if (cleanOwnedBundlesCount) {
   -            ownedBundlesCountPerNamespace.remove(namespace);
   -        }
   -
   -        if (!cleanReaderCache) {
   -            policyCacheInitMap.remove(namespace);
   -            return;
   -        }
   -
   +    private void cleanReaderCache(@NonNull NamespaceName namespace) {
            CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> 
readerFuture = readerCaches.remove(namespace);
            policyCacheInitMap.compute(namespace, (k, v) -> {
                policiesCache.entrySet().removeIf(entry -> 
Objects.equals(entry.getKey().getNamespaceObject(), namespace));
   @@ -586,7 +577,7 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
         */
        private void 
readMorePoliciesAsync(SystemTopicClient.Reader<PulsarEvent> reader) {
            if (closed.get()) {
   -            
cleanCache(reader.getSystemTopic().getTopicName().getNamespaceObject(), false, 
true);
   +            
cleanReaderCache(reader.getSystemTopic().getTopicName().getNamespaceObject());
                return;
            }
            reader.readNextAsync()
   @@ -605,8 +596,7 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
                            if (isAlreadyClosedException(ex)) {
                                log.info("Closing the topic policies reader for 
{}",
                                        reader.getSystemTopic().getTopicName());
   -                            cleanCache(
   -                                    
reader.getSystemTopic().getTopicName().getNamespaceObject(), false, true);
   +                            
cleanReaderCache(reader.getSystemTopic().getTopicName().getNamespaceObject());
                            } else {
                                log.warn("Read more topic polices exception, 
read again.", ex);
                                readMorePoliciesAsync(reader);
   ```
   
   As you can see, there is actually three cases:
   1. Clear the reader cache
   2. Clear the writer cache, reader cache and owner bundle cache
   3. Clear the reader cache if the exception is "already closed", otherwise, 
just clear `policyCacheInitMap`
   
   With the new `cleanCache` method you've added in this PR, there could be 
technically 8 possible combinations, which makes code even harder to read than 
existing code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to