lhotari commented on code in PR #24427:
URL: https://github.com/apache/pulsar/pull/24427#discussion_r2298052247


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java:
##########
@@ -178,92 +190,257 @@ public CompletableFuture<Void> 
deleteTopicPoliciesAsync(TopicName topicName,
                 log.info("Skip delete topic-level policies because {} has been 
removed before", changeEvents);
                 return CompletableFuture.completedFuture(null);
             }
-            return sendTopicPolicyEvent(topicName, ActionType.DELETE, null,
-                    keepGlobalPolicies);
+            // delete local policy
+            return updateTopicPoliciesAsync(topicName, null, false, 
ActionType.DELETE, true)
+                    .thenCompose(__ ->
+                            // delete global policy
+                            updateTopicPoliciesAsync(topicName, null, true, 
ActionType.DELETE, true));
         });
     }
 
     @Override
-    public CompletableFuture<Void> updateTopicPoliciesAsync(TopicName 
topicName, TopicPolicies policies) {
+    public CompletableFuture<Void> updateTopicPoliciesAsync(TopicName 
topicName,
+                                                            boolean 
isGlobalPolicy,
+                                                            boolean 
skipUpdateWhenTopicPolicyDoesntExist,
+                                                            
Consumer<TopicPolicies> policyUpdater) {
         if 
(NamespaceService.isHeartbeatNamespace(topicName.getNamespaceObject())) {
             return CompletableFuture.failedFuture(new 
BrokerServiceException.NotAllowedException(
                     "Not allowed to update topic policy for the heartbeat 
topic"));
         }
-        return sendTopicPolicyEvent(topicName, ActionType.UPDATE, policies, 
false);
+        return updateTopicPoliciesAsync(topicName, policyUpdater, 
isGlobalPolicy, ActionType.UPDATE,
+                skipUpdateWhenTopicPolicyDoesntExist);
     }
 
-    private CompletableFuture<Void> sendTopicPolicyEvent(TopicName topicName, 
ActionType actionType,
-         @Nullable TopicPolicies policies, boolean 
keepGlobalPoliciesAfterDeleting) {
-        return pulsarService.getPulsarResources().getNamespaceResources()
-                .getPoliciesAsync(topicName.getNamespaceObject())
-                .thenCompose(namespacePolicies -> {
-                    if (namespacePolicies.isPresent() && 
namespacePolicies.get().deleted) {
-                        log.debug("[{}] skip sending topic policy event since 
the namespace is deleted", topicName);
-                        return CompletableFuture.completedFuture(null);
-                    }
-
-                    try {
-                        createSystemTopicFactoryIfNeeded();
-                    } catch (PulsarServerException e) {
-                        return CompletableFuture.failedFuture(e);
-                    }
-                    CompletableFuture<Void> result = new CompletableFuture<>();
-                    writerCaches.get(topicName.getNamespaceObject())
-                            .whenComplete((writer, cause) -> {
-                                if (cause != null) {
-                                    
writerCaches.synchronous().invalidate(topicName.getNamespaceObject());
-                                    result.completeExceptionally(cause);
-                                } else {
-                                    CompletableFuture<MessageId> writeFuture =
-                                            
sendTopicPolicyEventInternal(topicName, actionType, writer, policies,
-                                                    
keepGlobalPoliciesAfterDeleting);
-                                    writeFuture.whenComplete((messageId, e) -> 
{
-                                        if (e != null) {
-                                            result.completeExceptionally(e);
-                                        } else {
-                                            if (messageId != null) {
-                                                result.complete(null);
+    private CompletableFuture<Void> updateTopicPoliciesAsync(TopicName 
topicName,
+                                                             
Consumer<TopicPolicies> policyUpdater,
+                                                             boolean 
isGlobalPolicy,
+                                                             ActionType 
actionType,
+                                                             boolean 
skipUpdateWhenTopicPolicyDoesntExist) {
+        if (closed.get()) {
+            return CompletableFuture.failedFuture(new 
BrokerServiceException(getClass().getName() + " is closed."));
+        }
+        TopicName partitionedTopicName = 
TopicName.get(topicName.getPartitionedTopicName());
+        Pair<TopicName, Boolean> sequencerKey = Pair.of(partitionedTopicName, 
isGlobalPolicy);
+
+        CompletableFuture<Void> operationFuture = new CompletableFuture<>();
+
+        // Chain the operation on the sequencer for the specific topic and 
policy type
+        topicPolicyUpdateSequencer.compute(sequencerKey, (key, existingFuture) 
-> {
+            CompletableFuture<Void> chain = (existingFuture == null || 
existingFuture.isDone())
+                    ? CompletableFuture.completedFuture(null)
+                    : existingFuture;
+
+            return chain.thenCompose(v ->
+                    pulsarService.getPulsarResources().getNamespaceResources()
+                            .getPoliciesAsync(topicName.getNamespaceObject())
+                            .thenCompose(namespacePolicies -> {
+                                if (namespacePolicies.isPresent() && 
namespacePolicies.get().deleted) {
+                                    log.debug("[{}] skip sending topic policy 
event since the namespace is deleted",
+                                            topicName);
+                                    return 
CompletableFuture.completedFuture(null);
+                                }
+                                return 
getTopicPoliciesAsync(partitionedTopicName,
+                                        isGlobalPolicy ? GetType.GLOBAL_ONLY : 
GetType.LOCAL_ONLY)
+                                        .thenCompose(currentPolicies -> {
+                                            if (currentPolicies.isEmpty() && 
skipUpdateWhenTopicPolicyDoesntExist) {
+                                                log.debug("[{}] No existing 
policies, skipping sending event as "
+                                                        + "requested", 
topicName);
+                                                return 
CompletableFuture.completedFuture(null);
+                                            }
+                                            TopicPolicies policiesToUpdate;
+                                            if (actionType == 
ActionType.DELETE) {
+                                                policiesToUpdate = null; // 
For delete, policies object is null
                                             } else {
-                                                result.completeExceptionally(
-                                                        new 
RuntimeException("Got message id is null."));
+                                                policiesToUpdate = 
currentPolicies.isEmpty()
+                                                        ? 
createTopicPolicies(isGlobalPolicy)
+                                                        : 
currentPolicies.get().clone();
+                                                
policyUpdater.accept(policiesToUpdate);
                                             }
-                                        }
-                                    });
-                            }
-                    });
-                    return result;
-                });
+                                            return 
sendTopicPolicyEventInternal(topicName, actionType, policiesToUpdate,
+                                                    isGlobalPolicy);
+                                        })
+                                        .thenCompose(messageId -> {
+                                            if (messageId == null) {
+                                                return 
CompletableFuture.completedFuture(null);
+                                            } else {
+                                                // asynchronously wait until 
the message ID is read by the reader
+                                                return 
untilMessageIdHasBeenRead(topicName.getNamespaceObject(),
+                                                        messageId);
+                                            }
+                                        });
+                            }));
+        }).whenComplete((res, ex) -> {
+            // remove the current future from the sequencer map, if it is done
+            // this would remove the future from the sequencer map when the 
last operation completes in the chained
+            // future
+            topicPolicyUpdateSequencer.compute(sequencerKey, (key, 
existingFuture) -> {
+                if (existingFuture != null && existingFuture.isDone()) {
+                    // Remove the completed future from the sequencer map
+                    return null;
+                }
+                return existingFuture;
+            });
+            if (ex != null) {
+                
writerCaches.synchronous().invalidate(topicName.getNamespaceObject());

Review Comment:
   oh yes, checking again



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to