lhotari commented on code in PR #24427:
URL: https://github.com/apache/pulsar/pull/24427#discussion_r2297516637
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -3534,66 +3496,53 @@ protected CompletableFuture<RetentionPolicies>
internalGetRetention(boolean appl
}));
}
- protected CompletableFuture<Void> internalSetRetention(RetentionPolicies
retention, boolean isGlobal) {
- validateRetentionPolicies(retention);
- if (retention == null) {
+ protected CompletableFuture<Void> internalSetRetention(RetentionPolicies
retentionToSet, boolean isGlobal) {
+ validateRetentionPolicies(retentionToSet);
+ if (retentionToSet == null) {
return CompletableFuture.completedFuture(null);
}
- return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
- .thenCompose(op -> {
- TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
- for (BacklogQuota.BacklogQuotaType backlogQuotaType :
BacklogQuota.BacklogQuotaType.values()) {
- BacklogQuota backlogQuota =
topicPolicies.getBackLogQuotaMap().get(backlogQuotaType.name());
- if (backlogQuota == null) {
- Policies policies =
getNamespacePolicies(topicName.getNamespaceObject());
- backlogQuota =
policies.backlog_quota_map.get(backlogQuotaType);
- }
- if (!checkBacklogQuota(backlogQuota, retention)) {
- log.warn(
- "[{}] Failed to update retention quota
configuration for topic {}: "
- + "conflicts with retention quota",
- clientAppId(), topicName);
- return FutureUtil.failedFuture(new
RestException(Status.PRECONDITION_FAILED,
- "Retention Quota must exceed configured
backlog quota for topic. "
- + "Please increase retention quota and
retry"));
- }
- }
- topicPolicies.setRetentionPolicies(retention);
- topicPolicies.setIsGlobal(isGlobal);
- return
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName,
topicPolicies);
- });
+ return getNamespacePoliciesAsync(topicName.getNamespaceObject())
+ .thenCompose(currentNamespacePolicies ->
getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
+ .thenCompose(op -> {
+ TopicPolicies currentTopicPolicies =
op.orElseGet(TopicPolicies::new);
+ for (BacklogQuota.BacklogQuotaType
backlogQuotaType :
+ BacklogQuota.BacklogQuotaType.values()) {
+ BacklogQuota backlogQuota =
+
currentTopicPolicies.getBackLogQuotaMap().get(backlogQuotaType.name());
+ if (backlogQuota == null) {
+ backlogQuota =
currentNamespacePolicies.backlog_quota_map.get(backlogQuotaType);
+ }
+ if (!checkBacklogQuota(backlogQuota,
retentionToSet)) {
+ log.warn("[{}] Failed to update retention
quota configuration for topic {}: "
+ + "conflicts with retention
quota", clientAppId(), topicName);
+ return FutureUtil.failedFuture(new
RestException(Status.PRECONDITION_FAILED,
+ "Retention Quota must exceed
configured backlog quota for topic. "
+ + "Please increase
retention quota and retry"));
+ }
+ }
+ return pulsar().getTopicPoliciesService()
+ .updateTopicPoliciesAsync(topicName,
isGlobal, retentionToSet == null, policies -> {
Review Comment:
fixed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]