This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit dd151a5a5323076d2faa733bd96182ff685b56ee
Author: Jiwei Guo <techno...@apache.org>
AuthorDate: Tue Aug 22 22:08:09 2023 +0800

    [fix][broker] Fix potential case cause retention policy not working on 
topic level (#21041)
    
    (cherry picked from commit d3a6df3c2a8f99ab3e5b7f6d826533ffd23328dc)
---
 .../broker/service/persistent/PersistentTopic.java | 22 +++++++--------
 .../service/persistent/PersistentTopicTest.java    | 31 ++++++++++++++++++++++
 2 files changed, 41 insertions(+), 12 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index ffb6828c8c3..3e3363b8c5b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1535,7 +1535,6 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
     CompletableFuture<Void> checkReplicationAndRetryOnFailure() {
         CompletableFuture<Void> result = new CompletableFuture<Void>();
         checkReplication().thenAccept(res -> {
-            log.info("[{}] Policies updated successfully", topic);
             result.complete(null);
         }).exceptionally(th -> {
             log.error("[{}] Policies update failed {}, scheduled retry in {} 
seconds", topic, th.getMessage(),
@@ -1555,7 +1554,8 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
         return messageDeduplication.checkStatus();
     }
 
-    private CompletableFuture<Void> checkPersistencePolicies() {
+    @VisibleForTesting
+    CompletableFuture<Void> checkPersistencePolicies() {
         TopicName topicName = TopicName.get(topic);
         CompletableFuture<Void> future = new CompletableFuture<>();
         brokerService.getManagedLedgerConfig(topicName).thenAccept(config -> {
@@ -3486,16 +3486,14 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
             replicators.forEach((name, replicator) -> 
replicator.updateRateLimiter());
             shadowReplicators.forEach((name, replicator) -> 
replicator.updateRateLimiter());
             checkMessageExpiry();
-            checkReplicationAndRetryOnFailure();
-
-            checkDeduplicationStatus();
-
-            preCreateSubscriptionForCompactionIfNeeded();
-
-            // update managed ledger config
-            checkPersistencePolicies();
-        }).exceptionally(e -> {
-            Throwable t = e instanceof CompletionException ? e.getCause() : e;
+        })
+        .thenCompose(__ -> checkReplicationAndRetryOnFailure())
+        .thenCompose(__ -> checkDeduplicationStatus())
+        .thenCompose(__ -> preCreateSubscriptionForCompactionIfNeeded())
+        .thenCompose(__ -> checkPersistencePolicies())
+        .thenAccept(__ -> log.info("[{}] Policies updated successfully", 
topic))
+        .exceptionally(e -> {
+            Throwable t = FutureUtil.unwrapCompletionException(e);
             log.error("[{}] update topic policy error: {}", topic, 
t.getMessage(), t);
             return null;
         });
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
index 412b8207e34..41704af0b8b 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
@@ -60,6 +60,7 @@ import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.broker.service.TopicPoliciesService;
 import org.apache.pulsar.broker.stats.PrometheusMetricsTest;
 import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
 import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -77,7 +78,9 @@ import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.policies.data.TopicPolicies;
 import org.apache.pulsar.common.policies.data.TopicStats;
 import org.awaitility.Awaitility;
 import org.junit.Assert;
@@ -603,4 +606,32 @@ public class PersistentTopicTest extends BrokerTestBase {
             return !topic.getManagedLedger().getCursors().iterator().hasNext();
         });
     }
+
+    @Test
+    public void testCheckPersistencePolicies() throws Exception {
+        final String myNamespace = "prop/ns";
+        admin.namespaces().createNamespace(myNamespace, 
Sets.newHashSet("test"));
+        final String topic = "persistent://" + myNamespace + "/testConfig" + 
UUID.randomUUID();
+        conf.setForceDeleteNamespaceAllowed(true);
+        pulsarClient.newProducer().topic(topic).create().close();
+        RetentionPolicies retentionPolicies = new RetentionPolicies(1, 1);
+        PersistentTopic persistentTopic = spy((PersistentTopic) 
pulsar.getBrokerService().getTopicIfExists(topic).get().get());
+        TopicPoliciesService policiesService = 
spy(pulsar.getTopicPoliciesService());
+        doReturn(policiesService).when(pulsar).getTopicPoliciesService();
+        TopicPolicies policies = new TopicPolicies();
+        policies.setRetentionPolicies(retentionPolicies);
+        
doReturn(policies).when(policiesService).getTopicPoliciesIfExists(TopicName.get(topic));
+        persistentTopic.onUpdate(policies);
+        verify(persistentTopic, times(1)).checkPersistencePolicies();
+        Awaitility.await().untilAsserted(() -> {
+            
assertEquals(persistentTopic.getManagedLedger().getConfig().getRetentionSizeInMB(),
 1L);
+            
assertEquals(persistentTopic.getManagedLedger().getConfig().getRetentionTimeMillis(),
 TimeUnit.MINUTES.toMillis(1));
+        });
+        // throw exception
+        doReturn(CompletableFuture.failedFuture(new 
RuntimeException())).when(persistentTopic).checkPersistencePolicies();
+        policies.setRetentionPolicies(new RetentionPolicies(2, 2));
+        persistentTopic.onUpdate(policies);
+        
assertEquals(persistentTopic.getManagedLedger().getConfig().getRetentionSizeInMB(),
 1L);
+        
assertEquals(persistentTopic.getManagedLedger().getConfig().getRetentionTimeMillis(),
 TimeUnit.MINUTES.toMillis(1));
+    }
 }

Reply via email to