Jason918 commented on a change in pull request #13710:
URL: https://github.com/apache/pulsar/pull/13710#discussion_r782767023



##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
##########
@@ -1438,45 +1438,15 @@ public void checkMessageDeduplicationInfo() {
     }
 
     public CompletableFuture<Boolean> isCompactionEnabled() {
-        Optional<Long> topicCompactionThreshold = getTopicPolicies()

Review comment:
       We can change this method to return bool since it's not blocking any 
more.

##########
File path: 
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
##########
@@ -1868,14 +1868,8 @@ public void 
testCompactionTriggeredAfterThresholdFirstInvocation() throws Except
         Compactor compactor = pulsar.getCompactor();
         doReturn(compactPromise).when(compactor).compact(anyString());
 
-        Policies policies = new Policies();
-        policies.compaction_threshold = 1L;
-
-        NamespaceResources nsr = 
pulsar.getPulsarResources().getNamespaceResources();
-        NamespaceName ns = 
TopicName.get(successTopicName).getNamespaceObject();
-        doReturn(Optional.of(policies)).when(nsr).getPolicies(ns);
-
         PersistentTopic topic = new PersistentTopic(successTopicName, 
ledgerMock, brokerService);
+        topic.topicPolicies.getCompactionThreshold().updateNamespaceValue(1L);

Review comment:
       You can use `topic.initialize()` to trigger the init.

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
##########
@@ -1438,45 +1438,15 @@ public void checkMessageDeduplicationInfo() {
     }
 
     public CompletableFuture<Boolean> isCompactionEnabled() {
-        Optional<Long> topicCompactionThreshold = getTopicPolicies()
-                .map(TopicPolicies::getCompactionThreshold);
-        if (topicCompactionThreshold.isPresent() && 
topicCompactionThreshold.get() > 0) {
-            return CompletableFuture.completedFuture(true);
-        }
-
-        TopicName topicName = TopicName.get(topic);
-        return 
brokerService.getPulsar().getPulsarResources().getNamespaceResources()
-                .getPoliciesAsync(topicName.getNamespaceObject())
-                .thenApply(policies -> {
-                    if (policies.isPresent()) {
-                        return policies.get().compaction_threshold != null
-                                && policies.get().compaction_threshold > 0;
-                    } else {
-                        // Check broker default
-                        return brokerService.pulsar().getConfiguration()
-                                .getBrokerServiceCompactionThresholdInBytes() 
> 0;
-                    }
-                });
+        Long topicCompactionThreshold = 
topicPolicies.getCompactionThreshold().get();
+        return CompletableFuture.completedFuture(topicCompactionThreshold != 
null && topicCompactionThreshold > 0);
     }
 
     public void checkCompaction() {
         TopicName name = TopicName.get(topic);
         try {
-            Long compactionThreshold = getTopicPolicies()
-                .map(TopicPolicies::getCompactionThreshold)
-                .orElse(null);
-            if (compactionThreshold == null) {
-                Policies policies = 
brokerService.pulsar().getPulsarResources().getNamespaceResources()
-                        .getPolicies(name.getNamespaceObject())
-                        .orElseThrow(() -> new 
MetadataStoreException.NotFoundException());
-                compactionThreshold = policies.compaction_threshold;
-            }
-            if (compactionThreshold == null) {
-                compactionThreshold = brokerService.pulsar().getConfiguration()
-                        .getBrokerServiceCompactionThresholdInBytes();
-            }
-
-            if (isSystemTopic() || compactionThreshold != 0
+            Long compactionThreshold = 
topicPolicies.getCompactionThreshold().get();

Review comment:
       `compactionThreshold` can't be null  since broker level value is `int` 
type.

##########
File path: 
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
##########
@@ -1868,14 +1868,8 @@ public void 
testCompactionTriggeredAfterThresholdFirstInvocation() throws Except
         Compactor compactor = pulsar.getCompactor();
         doReturn(compactPromise).when(compactor).compact(anyString());
 
-        Policies policies = new Policies();
-        policies.compaction_threshold = 1L;
-
-        NamespaceResources nsr = 
pulsar.getPulsarResources().getNamespaceResources();
-        NamespaceName ns = 
TopicName.get(successTopicName).getNamespaceObject();
-        doReturn(Optional.of(policies)).when(nsr).getPolicies(ns);
-
         PersistentTopic topic = new PersistentTopic(successTopicName, 
ledgerMock, brokerService);
+        topic.topicPolicies.getCompactionThreshold().updateNamespaceValue(1L);

Review comment:
       This does not cover the init procedure of 
`topicPolicies.getCompactionThreshold()`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to