Jason918 commented on a change in pull request #13710: URL: https://github.com/apache/pulsar/pull/13710#discussion_r782767023
########## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java ########## @@ -1438,45 +1438,15 @@ public void checkMessageDeduplicationInfo() { } public CompletableFuture<Boolean> isCompactionEnabled() { - Optional<Long> topicCompactionThreshold = getTopicPolicies() Review comment: We can change this method to return bool since it's not blocking any more. ########## File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java ########## @@ -1868,14 +1868,8 @@ public void testCompactionTriggeredAfterThresholdFirstInvocation() throws Except Compactor compactor = pulsar.getCompactor(); doReturn(compactPromise).when(compactor).compact(anyString()); - Policies policies = new Policies(); - policies.compaction_threshold = 1L; - - NamespaceResources nsr = pulsar.getPulsarResources().getNamespaceResources(); - NamespaceName ns = TopicName.get(successTopicName).getNamespaceObject(); - doReturn(Optional.of(policies)).when(nsr).getPolicies(ns); - PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService); + topic.topicPolicies.getCompactionThreshold().updateNamespaceValue(1L); Review comment: You can use `topic.initialize()` to trigger the init. ########## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java ########## @@ -1438,45 +1438,15 @@ public void checkMessageDeduplicationInfo() { } public CompletableFuture<Boolean> isCompactionEnabled() { - Optional<Long> topicCompactionThreshold = getTopicPolicies() - .map(TopicPolicies::getCompactionThreshold); - if (topicCompactionThreshold.isPresent() && topicCompactionThreshold.get() > 0) { - return CompletableFuture.completedFuture(true); - } - - TopicName topicName = TopicName.get(topic); - return brokerService.getPulsar().getPulsarResources().getNamespaceResources() - .getPoliciesAsync(topicName.getNamespaceObject()) - .thenApply(policies -> { - if (policies.isPresent()) { - return policies.get().compaction_threshold != null - && policies.get().compaction_threshold > 0; - } else { - // Check broker default - return brokerService.pulsar().getConfiguration() - .getBrokerServiceCompactionThresholdInBytes() > 0; - } - }); + Long topicCompactionThreshold = topicPolicies.getCompactionThreshold().get(); + return CompletableFuture.completedFuture(topicCompactionThreshold != null && topicCompactionThreshold > 0); } public void checkCompaction() { TopicName name = TopicName.get(topic); try { - Long compactionThreshold = getTopicPolicies() - .map(TopicPolicies::getCompactionThreshold) - .orElse(null); - if (compactionThreshold == null) { - Policies policies = brokerService.pulsar().getPulsarResources().getNamespaceResources() - .getPolicies(name.getNamespaceObject()) - .orElseThrow(() -> new MetadataStoreException.NotFoundException()); - compactionThreshold = policies.compaction_threshold; - } - if (compactionThreshold == null) { - compactionThreshold = brokerService.pulsar().getConfiguration() - .getBrokerServiceCompactionThresholdInBytes(); - } - - if (isSystemTopic() || compactionThreshold != 0 + Long compactionThreshold = topicPolicies.getCompactionThreshold().get(); Review comment: `compactionThreshold` can't be null since broker level value is `int` type. ########## File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java ########## @@ -1868,14 +1868,8 @@ public void testCompactionTriggeredAfterThresholdFirstInvocation() throws Except Compactor compactor = pulsar.getCompactor(); doReturn(compactPromise).when(compactor).compact(anyString()); - Policies policies = new Policies(); - policies.compaction_threshold = 1L; - - NamespaceResources nsr = pulsar.getPulsarResources().getNamespaceResources(); - NamespaceName ns = TopicName.get(successTopicName).getNamespaceObject(); - doReturn(Optional.of(policies)).when(nsr).getPolicies(ns); - PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService); + topic.topicPolicies.getCompactionThreshold().updateNamespaceValue(1L); Review comment: This does not cover the init procedure of `topicPolicies.getCompactionThreshold()`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org