This is an automated email from the ASF dual-hosted git repository. yubiao pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit dd151a5a5323076d2faa733bd96182ff685b56ee Author: Jiwei Guo <techno...@apache.org> AuthorDate: Tue Aug 22 22:08:09 2023 +0800 [fix][broker] Fix potential case cause retention policy not working on topic level (#21041) (cherry picked from commit d3a6df3c2a8f99ab3e5b7f6d826533ffd23328dc) --- .../broker/service/persistent/PersistentTopic.java | 22 +++++++-------- .../service/persistent/PersistentTopicTest.java | 31 ++++++++++++++++++++++ 2 files changed, 41 insertions(+), 12 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index ffb6828c8c3..3e3363b8c5b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1535,7 +1535,6 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal CompletableFuture<Void> checkReplicationAndRetryOnFailure() { CompletableFuture<Void> result = new CompletableFuture<Void>(); checkReplication().thenAccept(res -> { - log.info("[{}] Policies updated successfully", topic); result.complete(null); }).exceptionally(th -> { log.error("[{}] Policies update failed {}, scheduled retry in {} seconds", topic, th.getMessage(), @@ -1555,7 +1554,8 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal return messageDeduplication.checkStatus(); } - private CompletableFuture<Void> checkPersistencePolicies() { + @VisibleForTesting + CompletableFuture<Void> checkPersistencePolicies() { TopicName topicName = TopicName.get(topic); CompletableFuture<Void> future = new CompletableFuture<>(); brokerService.getManagedLedgerConfig(topicName).thenAccept(config -> { @@ -3486,16 +3486,14 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal replicators.forEach((name, replicator) -> replicator.updateRateLimiter()); shadowReplicators.forEach((name, replicator) -> replicator.updateRateLimiter()); checkMessageExpiry(); - checkReplicationAndRetryOnFailure(); - - checkDeduplicationStatus(); - - preCreateSubscriptionForCompactionIfNeeded(); - - // update managed ledger config - checkPersistencePolicies(); - }).exceptionally(e -> { - Throwable t = e instanceof CompletionException ? e.getCause() : e; + }) + .thenCompose(__ -> checkReplicationAndRetryOnFailure()) + .thenCompose(__ -> checkDeduplicationStatus()) + .thenCompose(__ -> preCreateSubscriptionForCompactionIfNeeded()) + .thenCompose(__ -> checkPersistencePolicies()) + .thenAccept(__ -> log.info("[{}] Policies updated successfully", topic)) + .exceptionally(e -> { + Throwable t = FutureUtil.unwrapCompletionException(e); log.error("[{}] update topic policy error: {}", topic, t.getMessage(), t); return null; }); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java index 412b8207e34..41704af0b8b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java @@ -60,6 +60,7 @@ import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.broker.service.BrokerTestBase; +import org.apache.pulsar.broker.service.TopicPoliciesService; import org.apache.pulsar.broker.stats.PrometheusMetricsTest; import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator; import org.apache.pulsar.client.admin.PulsarAdminException; @@ -77,7 +78,9 @@ import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.Policies; +import org.apache.pulsar.common.policies.data.RetentionPolicies; import org.apache.pulsar.common.policies.data.TenantInfo; +import org.apache.pulsar.common.policies.data.TopicPolicies; import org.apache.pulsar.common.policies.data.TopicStats; import org.awaitility.Awaitility; import org.junit.Assert; @@ -603,4 +606,32 @@ public class PersistentTopicTest extends BrokerTestBase { return !topic.getManagedLedger().getCursors().iterator().hasNext(); }); } + + @Test + public void testCheckPersistencePolicies() throws Exception { + final String myNamespace = "prop/ns"; + admin.namespaces().createNamespace(myNamespace, Sets.newHashSet("test")); + final String topic = "persistent://" + myNamespace + "/testConfig" + UUID.randomUUID(); + conf.setForceDeleteNamespaceAllowed(true); + pulsarClient.newProducer().topic(topic).create().close(); + RetentionPolicies retentionPolicies = new RetentionPolicies(1, 1); + PersistentTopic persistentTopic = spy((PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topic).get().get()); + TopicPoliciesService policiesService = spy(pulsar.getTopicPoliciesService()); + doReturn(policiesService).when(pulsar).getTopicPoliciesService(); + TopicPolicies policies = new TopicPolicies(); + policies.setRetentionPolicies(retentionPolicies); + doReturn(policies).when(policiesService).getTopicPoliciesIfExists(TopicName.get(topic)); + persistentTopic.onUpdate(policies); + verify(persistentTopic, times(1)).checkPersistencePolicies(); + Awaitility.await().untilAsserted(() -> { + assertEquals(persistentTopic.getManagedLedger().getConfig().getRetentionSizeInMB(), 1L); + assertEquals(persistentTopic.getManagedLedger().getConfig().getRetentionTimeMillis(), TimeUnit.MINUTES.toMillis(1)); + }); + // throw exception + doReturn(CompletableFuture.failedFuture(new RuntimeException())).when(persistentTopic).checkPersistencePolicies(); + policies.setRetentionPolicies(new RetentionPolicies(2, 2)); + persistentTopic.onUpdate(policies); + assertEquals(persistentTopic.getManagedLedger().getConfig().getRetentionSizeInMB(), 1L); + assertEquals(persistentTopic.getManagedLedger().getConfig().getRetentionTimeMillis(), TimeUnit.MINUTES.toMillis(1)); + } }