BewareMyPower commented on code in PR #24427:
URL: https://github.com/apache/pulsar/pull/24427#discussion_r2297326466


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -3534,66 +3496,53 @@ protected CompletableFuture<RetentionPolicies> 
internalGetRetention(boolean appl
             }));
     }
 
-    protected CompletableFuture<Void> internalSetRetention(RetentionPolicies 
retention, boolean isGlobal) {
-        validateRetentionPolicies(retention);
-        if (retention == null) {
+    protected CompletableFuture<Void> internalSetRetention(RetentionPolicies 
retentionToSet, boolean isGlobal) {
+        validateRetentionPolicies(retentionToSet);
+        if (retentionToSet == null) {
             return CompletableFuture.completedFuture(null);
         }
-        return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
-            .thenCompose(op -> {
-                TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
-                for (BacklogQuota.BacklogQuotaType backlogQuotaType : 
BacklogQuota.BacklogQuotaType.values()) {
-                    BacklogQuota backlogQuota = 
topicPolicies.getBackLogQuotaMap().get(backlogQuotaType.name());
-                    if (backlogQuota == null) {
-                        Policies policies = 
getNamespacePolicies(topicName.getNamespaceObject());
-                        backlogQuota = 
policies.backlog_quota_map.get(backlogQuotaType);
-                    }
-                    if (!checkBacklogQuota(backlogQuota, retention)) {
-                        log.warn(
-                                "[{}] Failed to update retention quota 
configuration for topic {}: "
-                                        + "conflicts with retention quota",
-                                clientAppId(), topicName);
-                        return FutureUtil.failedFuture(new 
RestException(Status.PRECONDITION_FAILED,
-                                "Retention Quota must exceed configured 
backlog quota for topic. "
-                                        + "Please increase retention quota and 
retry"));
-                    }
-                }
-                topicPolicies.setRetentionPolicies(retention);
-                topicPolicies.setIsGlobal(isGlobal);
-                return 
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, 
topicPolicies);
-            });
+        return getNamespacePoliciesAsync(topicName.getNamespaceObject())
+                .thenCompose(currentNamespacePolicies -> 
getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
+                        .thenCompose(op -> {
+                            TopicPolicies currentTopicPolicies = 
op.orElseGet(TopicPolicies::new);
+                            for (BacklogQuota.BacklogQuotaType 
backlogQuotaType :
+                                    BacklogQuota.BacklogQuotaType.values()) {
+                                BacklogQuota backlogQuota =
+                                        
currentTopicPolicies.getBackLogQuotaMap().get(backlogQuotaType.name());
+                                if (backlogQuota == null) {
+                                    backlogQuota = 
currentNamespacePolicies.backlog_quota_map.get(backlogQuotaType);
+                                }
+                                if (!checkBacklogQuota(backlogQuota, 
retentionToSet)) {
+                                    log.warn("[{}] Failed to update retention 
quota configuration for topic {}: "
+                                            + "conflicts with retention 
quota", clientAppId(), topicName);
+                                    return FutureUtil.failedFuture(new 
RestException(Status.PRECONDITION_FAILED,
+                                            "Retention Quota must exceed 
configured backlog quota for topic. "
+                                                    + "Please increase 
retention quota and retry"));
+                                }
+                            }
+                            return pulsar().getTopicPoliciesService()
+                                    .updateTopicPoliciesAsync(topicName, 
isGlobal, retentionToSet == null, policies -> {

Review Comment:
   `retentionToSet == null` is always true.
   
   The previous logic here is a bit confusing that when `retentionToSet` is 
null, this method will do nothing rather than deleting the retention policy. 
This method could only be called by `PersistentTopics#setRetention`, whose 
`retention` argument should never be null.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -5417,29 +5242,18 @@ protected CompletableFuture<EntryFilters> 
internalGetEntryFilters(boolean applie
                 });
     }
 
-    protected CompletableFuture<Void> internalSetEntryFilters(EntryFilters 
entryFilters,
+    protected CompletableFuture<Void> internalSetEntryFilters(EntryFilters 
entryFiltersToSet,
                                                               boolean 
isGlobal) {
-        validateEntryFilters(entryFilters);
-        return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
-                .thenCompose(op -> {
-                    TopicPolicies topicPolicies = 
op.orElseGet(TopicPolicies::new);
-                    topicPolicies.setEntryFilters(entryFilters);
-                    topicPolicies.setIsGlobal(isGlobal);
-                    return pulsar().getTopicPoliciesService()
-                            .updateTopicPoliciesAsync(topicName, 
topicPolicies);
-                });
+        validateEntryFilters(entryFiltersToSet);
+        return 
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, 
isGlobal, false, policies -> {
+            policies.setEntryFilters(entryFiltersToSet);
+        });
     }
 
     protected CompletableFuture<Void> internalRemoveEntryFilters(boolean 
isGlobal) {
-        return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
-                        .thenCompose(op -> {
-                            if (!op.isPresent()) {
-                                return CompletableFuture.completedFuture(null);
-                            }
-                            op.get().setEntryFilters(null);
-                            op.get().setIsGlobal(isGlobal);
-                            return 
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, 
op.get());
-                        });
+        return 
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, 
isGlobal, false, policies -> {

Review Comment:
   ```suggestion
           return 
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, 
isGlobal, true, policies -> {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to