This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit d1603ad1f60b0a58f3adeb729dc86bdcf0dd40aa
Author: Hao Zhang <[email protected]>
AuthorDate: Mon Mar 3 09:42:59 2025 +0800

    [fix][broker] Fix missing validation when setting retention policy on topic 
level (#24032)
    
    Co-authored-by: 张浩 <[email protected]>
    (cherry picked from commit 1eb7866e1c25f8296a95941b1c0591339da82f80)
---
 .../apache/pulsar/broker/admin/AdminResource.java  | 15 +++++++++++
 .../pulsar/broker/admin/impl/NamespacesBase.java   | 15 -----------
 .../broker/admin/impl/PersistentTopicsBase.java    |  1 +
 .../pulsar/broker/admin/TopicPoliciesTest.java     | 30 ++++++++++++++++++++++
 .../broker/service/SubscriptionSeekTest.java       |  2 +-
 5 files changed, 47 insertions(+), 16 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index dc92aeb0c77..5f2f031a2d4 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -857,6 +857,21 @@ public abstract class AdminResource extends 
PulsarWebResource {
 
     }
 
+    protected void validateRetentionPolicies(RetentionPolicies retention) {
+        if (retention == null) {
+            return;
+        }
+        checkArgument(retention.getRetentionSizeInMB() >= -1,
+                "Invalid retention policy: size limit must be >= -1");
+        checkArgument(retention.getRetentionTimeInMinutes() >= -1,
+                "Invalid retention policy: time limit must be >= -1");
+        checkArgument((retention.getRetentionTimeInMinutes() != 0 && 
retention.getRetentionSizeInMB() != 0)
+                        || (retention.getRetentionTimeInMinutes() == 0 && 
retention.getRetentionSizeInMB() == 0),
+                "Invalid retention policy: Setting a single time or size limit 
to 0 is invalid when "
+                        + "one of the limits has a non-zero value. Use the 
value of -1 instead of 0 to ignore a "
+                        + "specific limit. To disable retention both limits 
must be set to 0.");
+    }
+
     protected void validateEntryFilters(EntryFilters entryFilters) {
         if (entryFilters == null) {
             // remove entry filters
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index ca4c685b280..c866d2d6f8a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -1995,21 +1995,6 @@ public abstract class NamespacesBase extends 
AdminResource {
             });
     }
 
-    protected void validateRetentionPolicies(RetentionPolicies retention) {
-        if (retention == null) {
-            return;
-        }
-        checkArgument(retention.getRetentionSizeInMB() >= -1,
-                "Invalid retention policy: size limit must be >= -1");
-        checkArgument(retention.getRetentionTimeInMinutes() >= -1,
-                "Invalid retention policy: time limit must be >= -1");
-        checkArgument((retention.getRetentionTimeInMinutes() != 0 && 
retention.getRetentionSizeInMB() != 0)
-                        || (retention.getRetentionTimeInMinutes() == 0 && 
retention.getRetentionSizeInMB() == 0),
-                "Invalid retention policy: Setting a single time or size limit 
to 0 is invalid when "
-                        + "one of the limits has a non-zero value. Use the 
value of -1 instead of 0 to ignore a "
-                        + "specific limit. To disable retention both limits 
must be set to 0.");
-    }
-
     protected void internalSetDeduplicationSnapshotInterval(Integer interval) {
         validateNamespacePolicyOperation(namespaceName, 
PolicyName.DEDUPLICATION_SNAPSHOT, PolicyOperation.WRITE);
         if (interval != null && interval < 0) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index e88b1110d0a..ed47650f42a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -3489,6 +3489,7 @@ public class PersistentTopicsBase extends AdminResource {
     }
 
     protected CompletableFuture<Void> internalSetRetention(RetentionPolicies 
retention, boolean isGlobal) {
+        validateRetentionPolicies(retention);
         if (retention == null) {
             return CompletableFuture.completedFuture(null);
         }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index dc9a7ec4429..c6a78d275dd 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -39,6 +39,7 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
+import javax.ws.rs.BadRequestException;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
@@ -634,6 +635,35 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
         admin.topics().deletePartitionedTopic(testTopic, true);
     }
 
+    @Test
+    public void testRetentionPolicyValidation() throws Exception {
+        // should pass
+        admin.topicPolicies().setRetention(testTopic, new RetentionPolicies());
+        admin.topicPolicies().setRetention(testTopic, new 
RetentionPolicies(-1, -1));
+        admin.topicPolicies().setRetention(testTopic, new RetentionPolicies(1, 
1));
+
+        // should not pass validation
+        assertInvalidRetentionPolicy(testTopic, 1, 0);
+        assertInvalidRetentionPolicy(testTopic, 0, 1);
+        assertInvalidRetentionPolicy(testTopic, -1, 0);
+        assertInvalidRetentionPolicy(testTopic, 0, -1);
+        assertInvalidRetentionPolicy(testTopic, -2, 1);
+        assertInvalidRetentionPolicy(testTopic, 1, -2);
+
+        admin.topics().deletePartitionedTopic(testTopic, true);
+    }
+
+    private void assertInvalidRetentionPolicy(String topicName, int 
retentionTimeInMinutes, int retentionSizeInMB) {
+        try {
+            RetentionPolicies retention = new 
RetentionPolicies(retentionTimeInMinutes, retentionSizeInMB);
+            admin.topicPolicies().setRetention(topicName, retention);
+            fail("Validation should have failed for " + retention);
+        } catch (PulsarAdminException e) {
+            assertTrue(e.getCause() instanceof BadRequestException);
+            assertTrue(e.getMessage().startsWith("Invalid retention policy"));
+        }
+    }
+
     @Test
     public void testRemoveRetention() throws Exception {
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
index 3a9c5c43f1c..4970dc88188 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
@@ -928,7 +928,7 @@ public class SubscriptionSeekTest extends BrokerTestBase {
     public void testSeekWillNotEncounteredFencedError() throws Exception {
         String topicName = "persistent://prop/ns-abc/my-topic2";
         admin.topics().createNonPartitionedTopic(topicName);
-        admin.topicPolicies().setRetention(topicName, new 
RetentionPolicies(3600, 0));
+        admin.topicPolicies().setRetention(topicName, new 
RetentionPolicies(3600, -1));
         // Create a pulsar client with a subscription fenced counter.
         ClientBuilderImpl clientBuilder = (ClientBuilderImpl) 
PulsarClient.builder().serviceUrl(lookupUrl.toString());
         AtomicInteger receivedFencedErrorCounter = new AtomicInteger();

Reply via email to