This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new c27ec7d2c32 [feat][broker] add topic-level subscription expiration 
time policy (#25258)
c27ec7d2c32 is described below

commit c27ec7d2c32275df3f221e6cefeb4d3f01551b76
Author: Penghui Li <[email protected]>
AuthorDate: Thu Feb 26 04:16:56 2026 -0800

    [feat][broker] add topic-level subscription expiration time policy (#25258)
---
 .../broker/admin/impl/PersistentTopicsBase.java    |  31 ++++++
 .../pulsar/broker/admin/v2/PersistentTopics.java   |  84 ++++++++++++++++
 .../pulsar/broker/service/AbstractTopic.java       |   6 ++
 .../broker/service/persistent/PersistentTopic.java |  18 +---
 .../broker/admin/TopicPoliciesAuthZTest.java       |  84 ++++++++++++++++
 .../broker/admin/TopicPoliciesDisableTest.java     |  27 +++++
 .../pulsar/broker/admin/TopicPoliciesTest.java     | 112 +++++++++++++++++++++
 .../pulsar/broker/service/PersistentTopicTest.java |   3 +-
 .../apache/pulsar/client/admin/TopicPolicies.java  |  77 ++++++++++++++
 .../client/admin/internal/TopicPoliciesImpl.java   |  57 +++++++++++
 .../pulsar/admin/cli/PulsarAdminToolTest.java      |  12 +++
 .../apache/pulsar/admin/cli/CmdTopicPolicies.java  |  58 +++++++++++
 .../policies/data/HierarchyTopicPolicies.java      |   2 +
 .../pulsar/common/policies/data/TopicPolicies.java |   5 +
 14 files changed, 558 insertions(+), 18 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 99c0b90d45d..76b3247c599 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -3481,6 +3481,37 @@ public class PersistentTopicsBase extends AdminResource {
                 });
     }
 
+    protected CompletableFuture<Integer> 
internalGetSubscriptionExpirationTime(boolean applied, boolean isGlobal) {
+        return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
+                .thenApply(op -> 
op.map(TopicPolicies::getSubscriptionExpirationTimeInMinutes)
+                        .orElseGet(() -> {
+                            if (applied) {
+                                Integer namespacePolicy = 
getNamespacePolicies(namespaceName)
+                                        .subscription_expiration_time_minutes;
+                                return namespacePolicy == null
+                                        ? 
config().getSubscriptionExpirationTimeMinutes()
+                                        : namespacePolicy;
+                            }
+                            return null;
+                        }));
+    }
+
+    protected CompletableFuture<Void> 
internalSetSubscriptionExpirationTime(Integer expirationTimeToSet,
+                                                                            
boolean isGlobal) {
+        if (expirationTimeToSet != null && expirationTimeToSet < 0) {
+            return FutureUtil.failedFuture(new 
RestException(Status.PRECONDITION_FAILED,
+                    "Invalid value for subscription expiration time"));
+        }
+
+        return pulsar().getTopicPoliciesService()
+                .updateTopicPoliciesAsync(topicName, isGlobal, 
expirationTimeToSet == null, policies -> {
+                    
policies.setSubscriptionExpirationTimeInMinutes(expirationTimeToSet);
+                    log.info("[{}] Successfully set topic subscription 
expiration time: namespace={}, "
+                                    + "topic={}, time={}",
+                            clientAppId(), namespaceName, 
topicName.getLocalName(), expirationTimeToSet);
+                });
+    }
+
     protected CompletableFuture<Void> internalSetMessageTTL(Integer 
ttlInSecondToSet, boolean isGlobal) {
         //Validate message ttl value.
         if (ttlInSecondToSet != null && ttlInSecondToSet < 0) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index fc34164b5cf..3d54d73ff45 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -2454,6 +2454,90 @@ public class PersistentTopics extends 
PersistentTopicsBase {
                 });
     }
 
+    @GET
+    @Path("/{tenant}/{namespace}/{topic}/subscriptionExpirationTime")
+    @ApiOperation(value = "Get subscription expiration time in minutes for a 
topic", response = Integer.class)
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have 
admin permission"),
+            @ApiResponse(code = 404, message = "Namespace or topic doesn't 
exist"),
+            @ApiResponse(code = 405, message =
+                    "Topic level policy is disabled, enable the topic level 
policy and retry")})
+    public void getSubscriptionExpirationTime(@Suspended final AsyncResponse 
asyncResponse,
+            @PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace,
+            @PathParam("topic") @Encoded String encodedTopic,
+            @QueryParam("applied") @DefaultValue("false") boolean applied,
+            @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
+            @ApiParam(value = "Whether leader broker redirected this call to 
this broker. For internal use.")
+            @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        validateTopicPolicyOperationAsync(topicName, 
PolicyName.SUBSCRIPTION_EXPIRATION_TIME, PolicyOperation.READ)
+                .thenCompose(__ -> preValidation(authoritative))
+                .thenCompose(__ -> 
internalGetSubscriptionExpirationTime(applied, isGlobal))
+                .thenAccept(asyncResponse::resume)
+                .exceptionally(ex -> {
+                    
handleTopicPolicyException("getSubscriptionExpirationTime", ex, asyncResponse);
+                    return null;
+                });
+    }
+
+    @POST
+    @Path("/{tenant}/{namespace}/{topic}/subscriptionExpirationTime")
+    @ApiOperation(value = "Set subscription expiration time in minutes for a 
topic")
+    @ApiResponses(value = {
+            @ApiResponse(code = 204, message = "Operation successful"),
+            @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Namespace or topic doesn't 
exist"),
+            @ApiResponse(code = 405, message =
+                    "Topic level policy is disabled, enable the topic level 
policy and retry"),
+            @ApiResponse(code = 412, message = "Invalid subscription 
expiration time value")})
+    public void setSubscriptionExpirationTime(@Suspended final AsyncResponse 
asyncResponse,
+            @PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace,
+            @PathParam("topic") @Encoded String encodedTopic,
+            @ApiParam(value = "Subscription expiration time in minutes", 
required = true)
+            @QueryParam("subscriptionExpirationTime") Integer 
subscriptionExpirationTime,
+            @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
+            @ApiParam(value = "Whether leader broker redirected this call to 
this broker. For internal use.")
+            @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        validateTopicPolicyOperationAsync(topicName, 
PolicyName.SUBSCRIPTION_EXPIRATION_TIME, PolicyOperation.WRITE)
+                .thenCompose(__ -> preValidation(authoritative))
+                .thenCompose(__ -> 
internalSetSubscriptionExpirationTime(subscriptionExpirationTime, isGlobal))
+                .thenRun(() -> 
asyncResponse.resume(Response.noContent().build()))
+                .exceptionally(ex -> {
+                    
handleTopicPolicyException("setSubscriptionExpirationTime", ex, asyncResponse);
+                    return null;
+                });
+    }
+
+    @DELETE
+    @Path("/{tenant}/{namespace}/{topic}/subscriptionExpirationTime")
+    @ApiOperation(value = "Remove subscription expiration time for a topic")
+    @ApiResponses(value = {
+            @ApiResponse(code = 204, message = "Operation successful"),
+            @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Namespace or topic doesn't 
exist"),
+            @ApiResponse(code = 405, message =
+                    "Topic level policy is disabled, enable the topic level 
policy and retry"),
+            @ApiResponse(code = 412, message = "Invalid subscription 
expiration time value")})
+    public void removeSubscriptionExpirationTime(@Suspended final 
AsyncResponse asyncResponse,
+            @PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace,
+            @PathParam("topic") @Encoded String encodedTopic,
+            @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
+            @ApiParam(value = "Whether leader broker redirected this call to 
this broker. For internal use.")
+            @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        validateTopicPolicyOperationAsync(topicName, 
PolicyName.SUBSCRIPTION_EXPIRATION_TIME, PolicyOperation.WRITE)
+                .thenCompose(__ -> preValidation(authoritative))
+                .thenCompose(__ -> internalSetSubscriptionExpirationTime(null, 
isGlobal))
+                .thenRun(() -> 
asyncResponse.resume(Response.noContent().build()))
+                .exceptionally(ex -> {
+                    
handleTopicPolicyException("removeSubscriptionExpirationTime", ex, 
asyncResponse);
+                    return null;
+                });
+    }
+
     @GET
     @Path("/{tenant}/{namespace}/{topic}/messageTTL")
     @ApiOperation(value = "Get message TTL in seconds for a topic", response = 
Integer.class)
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 0ac5ae999b0..c6a4c44398d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -281,6 +281,8 @@ public abstract class AbstractTopic implements Topic, 
TopicPolicyListener {
                 isGlobalPolicies);
         
topicPolicies.getMessageTTLInSeconds().updateTopicValue(normalizeValue(data.getMessageTTLInSeconds()),
                 isGlobalPolicies);
+        topicPolicies.getSubscriptionExpirationTimeInMinutes()
+                
.updateTopicValue(normalizeValue(data.getSubscriptionExpirationTimeInMinutes()),
 isGlobalPolicies);
         
topicPolicies.getPublishRate().updateTopicValue(PublishRate.normalize(data.getPublishRate()),
 isGlobalPolicies);
         
topicPolicies.getDelayedDeliveryEnabled().updateTopicValue(data.getDelayedDeliveryEnabled(),
 isGlobalPolicies);
         topicPolicies.getReplicatorDispatchRate().updateTopicValue(
@@ -324,6 +326,8 @@ public abstract class AbstractTopic implements Topic, 
TopicPolicyListener {
                 
.updateNamespaceValue(normalizeValue(namespacePolicies.max_unacked_messages_per_subscription));
         topicPolicies.getMessageTTLInSeconds()
                 
.updateNamespaceValue(normalizeValue(namespacePolicies.message_ttl_in_seconds));
+        topicPolicies.getSubscriptionExpirationTimeInMinutes()
+                
.updateNamespaceValue(normalizeValue(namespacePolicies.subscription_expiration_time_minutes));
         topicPolicies.getMaxSubscriptionsPerTopic()
                 
.updateNamespaceValue(normalizeValue(namespacePolicies.max_subscriptions_per_topic));
         topicPolicies.getMaxProducersPerTopic()
@@ -444,6 +448,8 @@ public abstract class AbstractTopic implements Topic, 
TopicPolicyListener {
 
         
topicPolicies.getTopicMaxMessageSize().updateBrokerValue(config.getMaxMessageSize());
         
topicPolicies.getMessageTTLInSeconds().updateBrokerValue(config.getTtlDurationDefaultInSeconds());
+        topicPolicies.getSubscriptionExpirationTimeInMinutes()
+                
.updateBrokerValue(config.getSubscriptionExpirationTimeMinutes());
         
topicPolicies.getPublishRate().updateBrokerValue(publishRateInBroker(config));
         
topicPolicies.getDelayedDeliveryEnabled().updateBrokerValue(config.isDelayedDeliveryEnabled());
         
topicPolicies.getDelayedDeliveryTickTimeMillis().updateBrokerValue(config.getDelayedDeliveryTickTimeMillis());
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 5a1e0e940a8..da4d717e798 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -3559,22 +3559,8 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
 
     @Override
     public void checkInactiveSubscriptions() {
-        TopicName name = TopicName.get(topic);
-        try {
-            Policies policies = 
brokerService.pulsar().getPulsarResources().getNamespaceResources()
-                    .getPolicies(name.getNamespaceObject())
-                    .orElseThrow(() -> new 
MetadataStoreException.NotFoundException());
-            final int defaultExpirationTime = 
brokerService.pulsar().getConfiguration()
-                    .getSubscriptionExpirationTimeMinutes();
-            final Integer nsExpirationTime = 
policies.subscription_expiration_time_minutes;
-            final long expirationTimeMillis = TimeUnit.MINUTES
-                    .toMillis(nsExpirationTime == null ? defaultExpirationTime 
: nsExpirationTime);
-            checkInactiveSubscriptions(expirationTimeMillis);
-        } catch (Exception e) {
-            if (log.isDebugEnabled()) {
-                log.debug("[{}] Error getting policies", topic);
-            }
-        }
+        Integer expirationTime = 
topicPolicies.getSubscriptionExpirationTimeInMinutes().get();
+        checkInactiveSubscriptions(TimeUnit.MINUTES.toMillis(expirationTime));
     }
 
     @VisibleForTesting
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesAuthZTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesAuthZTest.java
index b758e878507..b15ecbe9600 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesAuthZTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesAuthZTest.java
@@ -492,4 +492,88 @@ public final class TopicPoliciesAuthZTest extends 
MockedPulsarStandalone {
         }
 
     }
+
+    @SneakyThrows
+    @Test
+    public void testSubscriptionExpirationTime() {
+        final String random = UUID.randomUUID().toString();
+        final String topic = "persistent://public/default/" + random;
+        final String subject = UUID.randomUUID().toString();
+        final String token = Jwts.builder()
+                .claim("sub", subject).signWith(SECRET_KEY).compact();
+        superUserAdmin.topics().createNonPartitionedTopic(topic);
+
+        @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+                .authentication(new AuthenticationToken(token))
+                .build();
+
+        int expirationTimeInMinutes = 10;
+
+        // test superuser
+        superUserAdmin.topicPolicies().setSubscriptionExpirationTime(topic, 
expirationTimeInMinutes);
+        await().untilAsserted(() -> Assert.assertEquals(
+                
superUserAdmin.topicPolicies().getSubscriptionExpirationTime(topic).intValue(),
+                expirationTimeInMinutes));
+        superUserAdmin.topicPolicies().removeSubscriptionExpirationTime(topic);
+        await().untilAsserted(() ->
+                
Assert.assertNull(superUserAdmin.topicPolicies().getSubscriptionExpirationTime(topic)));
+
+        // test tenant manager
+        
tenantManagerAdmin.topicPolicies().setSubscriptionExpirationTime(topic, 
expirationTimeInMinutes);
+        await().untilAsserted(() -> Assert.assertEquals(
+                
tenantManagerAdmin.topicPolicies().getSubscriptionExpirationTime(topic).intValue(),
+                expirationTimeInMinutes));
+        
tenantManagerAdmin.topicPolicies().removeSubscriptionExpirationTime(topic);
+        await().untilAsserted(() ->
+                
Assert.assertNull(tenantManagerAdmin.topicPolicies().getSubscriptionExpirationTime(topic)));
+
+        // test nobody
+        try {
+            subAdmin.topicPolicies().getSubscriptionExpirationTime(topic);
+            Assert.fail("unexpected behaviour");
+        } catch (PulsarAdminException ex) {
+            Assert.assertTrue(ex instanceof 
PulsarAdminException.NotAuthorizedException);
+        }
+
+        try {
+            subAdmin.topicPolicies().setSubscriptionExpirationTime(topic, 
expirationTimeInMinutes);
+            Assert.fail("unexpected behaviour");
+        } catch (PulsarAdminException ex) {
+            Assert.assertTrue(ex instanceof 
PulsarAdminException.NotAuthorizedException);
+        }
+
+        try {
+            subAdmin.topicPolicies().removeSubscriptionExpirationTime(topic);
+            Assert.fail("unexpected behaviour");
+        } catch (PulsarAdminException ex) {
+            Assert.assertTrue(ex instanceof 
PulsarAdminException.NotAuthorizedException);
+        }
+
+        // test sub user with permissions
+        for (AuthAction action : AuthAction.values()) {
+            
superUserAdmin.namespaces().grantPermissionOnNamespace("public/default", 
subject, Set.of(action));
+            try {
+                subAdmin.topicPolicies().getSubscriptionExpirationTime(topic);
+                Assert.fail("unexpected behaviour");
+            } catch (PulsarAdminException ex) {
+                Assert.assertTrue(ex instanceof 
PulsarAdminException.NotAuthorizedException);
+            }
+
+            try {
+                subAdmin.topicPolicies().setSubscriptionExpirationTime(topic, 
expirationTimeInMinutes);
+                Assert.fail("unexpected behaviour");
+            } catch (PulsarAdminException ex) {
+                Assert.assertTrue(ex instanceof 
PulsarAdminException.NotAuthorizedException);
+            }
+
+            try {
+                
subAdmin.topicPolicies().removeSubscriptionExpirationTime(topic);
+                Assert.fail("unexpected behaviour");
+            } catch (PulsarAdminException ex) {
+                Assert.assertTrue(ex instanceof 
PulsarAdminException.NotAuthorizedException);
+            }
+            
superUserAdmin.namespaces().revokePermissionsOnNamespace("public/default", 
subject);
+        }
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesDisableTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesDisableTest.java
index 29c659a23da..98465e233b8 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesDisableTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesDisableTest.java
@@ -228,6 +228,33 @@ public class TopicPoliciesDisableTest extends 
MockedPulsarServiceBaseTest {
         }
     }
 
+    @Test
+    public void testSubscriptionExpirationTimeDisabled() throws Exception {
+        int subscriptionExpirationTime = 10;
+        log.info("SubscriptionExpirationTime: {} will set to the topic: {}", 
subscriptionExpirationTime, testTopic);
+
+        try {
+            admin.topicPolicies().setSubscriptionExpirationTime(testTopic, 
subscriptionExpirationTime);
+            Assert.fail();
+        } catch (PulsarAdminException e) {
+            Assert.assertEquals(e.getStatusCode(), 
HttpStatus.METHOD_NOT_ALLOWED_405);
+        }
+
+        try {
+            admin.topicPolicies().getSubscriptionExpirationTime(testTopic);
+            Assert.fail();
+        } catch (PulsarAdminException e) {
+            Assert.assertEquals(e.getStatusCode(), 
HttpStatus.METHOD_NOT_ALLOWED_405);
+        }
+
+        try {
+            admin.topicPolicies().removeSubscriptionExpirationTime(testTopic);
+            Assert.fail();
+        } catch (PulsarAdminException e) {
+            Assert.assertEquals(e.getStatusCode(), 
HttpStatus.METHOD_NOT_ALLOWED_405);
+        }
+    }
+
     @Test
     public void testPublishRateDisabled() throws Exception {
         PublishRate publishRate = new PublishRate(10000, 1024 * 1024 * 5);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index 3be7acad770..86d92e421f7 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -2044,6 +2044,118 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
         admin.topics().deletePartitionedTopic(testTopic, true);
     }
 
+    @Test
+    public void testGetSetRemoveSubscriptionExpirationTime() throws Exception {
+        final String topic = testTopic + UUID.randomUUID();
+        admin.topics().createNonPartitionedTopic(topic);
+        int expirationTimeInMinutes = 10;
+
+        admin.topicPolicies().setSubscriptionExpirationTime(topic, 
expirationTimeInMinutes);
+        Awaitility.await().untilAsserted(() -> Assert.assertEquals(
+                
admin.topicPolicies().getSubscriptionExpirationTime(topic).intValue(),
+                expirationTimeInMinutes));
+
+        admin.topicPolicies().removeSubscriptionExpirationTime(topic);
+        Awaitility.await().untilAsserted(() ->
+                
Assert.assertNull(admin.topicPolicies().getSubscriptionExpirationTime(topic)));
+
+        try {
+            admin.topicPolicies().setSubscriptionExpirationTime(topic, -1);
+            fail("Setting negative subscription expiration time should fail");
+        } catch (PulsarAdminException e) {
+            Assert.assertEquals(e.getStatusCode(), 412);
+        }
+
+        admin.topics().delete(topic, true);
+    }
+
+    @Test
+    public void testSubscriptionExpirationTimeAppliedAndHierarchy() throws 
Exception {
+        final String topic = testTopic + UUID.randomUUID();
+        admin.topics().createNonPartitionedTopic(topic);
+
+        int brokerDefault = conf.getSubscriptionExpirationTimeMinutes();
+        
Assert.assertNull(admin.topicPolicies().getSubscriptionExpirationTime(topic));
+        
Assert.assertNull(admin.namespaces().getSubscriptionExpirationTime(myNamespace));
+        
Assert.assertEquals(admin.topicPolicies().getSubscriptionExpirationTime(topic, 
true).intValue(), brokerDefault);
+        waitTopicPoliciesApplied(topic, 0, hierarchyTopicPolicies -> 
assertEquals(
+                
hierarchyTopicPolicies.getSubscriptionExpirationTimeInMinutes().get(), 
Integer.valueOf(brokerDefault)));
+
+        admin.namespaces().setSubscriptionExpirationTime(myNamespace, 11);
+        Awaitility.await().untilAsserted(() ->
+                
assertEquals(admin.namespaces().getSubscriptionExpirationTime(myNamespace).intValue(),
 11));
+        Awaitility.await().untilAsserted(() ->
+                
assertEquals(admin.topicPolicies().getSubscriptionExpirationTime(topic, 
true).intValue(), 11));
+        waitTopicPoliciesApplied(topic, 0, hierarchyTopicPolicies -> 
assertEquals(
+                
hierarchyTopicPolicies.getSubscriptionExpirationTimeInMinutes().get(), 
Integer.valueOf(11)));
+
+        admin.topicPolicies().setSubscriptionExpirationTime(topic, 22);
+        Awaitility.await().untilAsserted(() ->
+                
assertEquals(admin.topicPolicies().getSubscriptionExpirationTime(topic).intValue(),
 22));
+        Awaitility.await().untilAsserted(() ->
+                
assertEquals(admin.topicPolicies().getSubscriptionExpirationTime(topic, 
true).intValue(), 22));
+        waitTopicPoliciesApplied(topic, 0, hierarchyTopicPolicies -> 
assertEquals(
+                
hierarchyTopicPolicies.getSubscriptionExpirationTimeInMinutes().get(), 
Integer.valueOf(22)));
+
+        admin.topicPolicies().removeSubscriptionExpirationTime(topic);
+        Awaitility.await().untilAsserted(() ->
+                
assertNull(admin.topicPolicies().getSubscriptionExpirationTime(topic)));
+        Awaitility.await().untilAsserted(() ->
+                
assertEquals(admin.topicPolicies().getSubscriptionExpirationTime(topic, 
true).intValue(), 11));
+        waitTopicPoliciesApplied(topic, 0, hierarchyTopicPolicies -> 
assertEquals(
+                
hierarchyTopicPolicies.getSubscriptionExpirationTimeInMinutes().get(), 
Integer.valueOf(11)));
+
+        admin.namespaces().removeSubscriptionExpirationTime(myNamespace);
+        Awaitility.await().untilAsserted(() ->
+                
assertNull(admin.namespaces().getSubscriptionExpirationTime(myNamespace)));
+        Awaitility.await().untilAsserted(() -> assertEquals(
+                admin.topicPolicies().getSubscriptionExpirationTime(topic, 
true).intValue(), brokerDefault));
+        waitTopicPoliciesApplied(topic, 0, hierarchyTopicPolicies -> 
assertEquals(
+                
hierarchyTopicPolicies.getSubscriptionExpirationTimeInMinutes().get(), 
Integer.valueOf(brokerDefault)));
+
+        admin.topics().delete(topic, true);
+    }
+
+    @Test
+    public void testSubscriptionExpirationTimeRuntimePrecedence() throws 
Exception {
+        final String topic = testTopic + UUID.randomUUID();
+        final String subName = "subscription-expiration-sub";
+        admin.topics().createNonPartitionedTopic(topic);
+        admin.topics().createSubscription(topic, subName, MessageId.latest);
+
+        PersistentTopic persistentTopic = (PersistentTopic) 
pulsar.getBrokerService()
+                .getTopicIfExists(topic).get().get();
+        ManagedCursorImpl cursor = (ManagedCursorImpl) 
persistentTopic.getSubscription(subName).getCursor();
+        Field cursorLastActiveField = 
ManagedCursorImpl.class.getDeclaredField("lastActive");
+        cursorLastActiveField.setAccessible(true);
+
+        admin.namespaces().setSubscriptionExpirationTime(myNamespace, 1);
+        waitTopicPoliciesApplied(topic, 0, hierarchyTopicPolicies -> 
assertEquals(
+                
hierarchyTopicPolicies.getSubscriptionExpirationTimeInMinutes().get(), 
Integer.valueOf(1)));
+
+        cursorLastActiveField.set(cursor, 0L);
+        persistentTopic.checkInactiveSubscriptions();
+        Awaitility.await().untilAsserted(() -> 
assertEquals(admin.topics().getSubscriptions(topic).size(), 0));
+
+        admin.topics().createSubscription(topic, subName, MessageId.latest);
+        admin.topicPolicies().setSubscriptionExpirationTime(topic, 0);
+        Awaitility.await().untilAsserted(() ->
+                
assertEquals(admin.topicPolicies().getSubscriptionExpirationTime(topic).intValue(),
 0));
+        waitTopicPoliciesApplied(topic, 0, hierarchyTopicPolicies -> 
assertEquals(
+                
hierarchyTopicPolicies.getSubscriptionExpirationTimeInMinutes().get(), 
Integer.valueOf(0)));
+
+        persistentTopic = (PersistentTopic) 
pulsar.getBrokerService().getTopicIfExists(topic).get().get();
+        cursor = (ManagedCursorImpl) 
persistentTopic.getSubscription(subName).getCursor();
+        cursorLastActiveField.set(cursor, 0L);
+        persistentTopic.checkInactiveSubscriptions();
+        Awaitility.await().during(2, TimeUnit.SECONDS).atMost(5, 
TimeUnit.SECONDS)
+                .untilAsserted(() -> 
assertEquals(admin.topics().getSubscriptions(topic).size(), 1));
+
+        admin.topicPolicies().removeSubscriptionExpirationTime(topic);
+        admin.namespaces().removeSubscriptionExpirationTime(myNamespace);
+        admin.topics().delete(topic, true);
+    }
+
     @Test
     public void testGetSetPublishRate() throws Exception {
         PublishRate publishRate = new PublishRate(10000, 1024 * 1024 * 5);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index c83f3750ff0..bd817de553b 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -2058,6 +2058,7 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
 
     @Test
     public void testCheckInactiveSubscriptions() throws Exception {
+        pulsarTestContext.getConfig().setSubscriptionExpirationTimeMinutes(5);
         PersistentTopic topic = new PersistentTopic(successTopicName, 
ledgerMock, brokerService);
 
         final var subscriptions = new ConcurrentHashMap<String, 
PersistentSubscription>();
@@ -2093,8 +2094,6 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
                 
.createPolicies(TopicName.get(successTopicName).getNamespaceObject(),
                         new Policies());
 
-        pulsarTestContext.getConfig().setSubscriptionExpirationTimeMinutes(5);
-
         doReturn(System.currentTimeMillis() - 
TimeUnit.MINUTES.toMillis(6)).when(cursorMock).getLastActive();
 
         topic.checkInactiveSubscriptions();
diff --git 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/TopicPolicies.java
 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/TopicPolicies.java
index 3e985dd7281..69ea350dd1f 100644
--- 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/TopicPolicies.java
+++ 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/TopicPolicies.java
@@ -165,6 +165,83 @@ public interface TopicPolicies {
      */
     void removeDelayedDeliveryPolicy(String topic) throws PulsarAdminException;
 
+    /**
+     * Set subscription expiration time for a topic in minutes.
+     *
+     * @param topic
+     *          Topic name
+     * @param subscriptionExpirationTimeInMinutes
+     *          Subscription expiration time in minutes.
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void setSubscriptionExpirationTime(String topic, int 
subscriptionExpirationTimeInMinutes)
+            throws PulsarAdminException;
+
+    /**
+     * Set subscription expiration time for a topic in minutes asynchronously.
+     *
+     * @param topic
+     *          Topic name
+     * @param subscriptionExpirationTimeInMinutes
+     *          Subscription expiration time in minutes.
+     */
+    CompletableFuture<Void> setSubscriptionExpirationTimeAsync(String topic, 
int subscriptionExpirationTimeInMinutes);
+
+    /**
+     * Get subscription expiration time for a topic.
+     *
+     * @param topic
+     * @return Subscription expiration time in minutes.
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    Integer getSubscriptionExpirationTime(String topic) throws 
PulsarAdminException;
+
+    /**
+     * Get subscription expiration time for a topic asynchronously.
+     *
+     * @param topic
+     * @return Subscription expiration time in minutes.
+     */
+    CompletableFuture<Integer> getSubscriptionExpirationTimeAsync(String 
topic);
+
+    /**
+     * Get applied subscription expiration time for a topic.
+     *
+     * @param topic
+     * @param applied
+     * @return Subscription expiration time in minutes.
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    Integer getSubscriptionExpirationTime(String topic, boolean applied) 
throws PulsarAdminException;
+
+    /**
+     * Get applied subscription expiration time for a topic asynchronously.
+     *
+     * @param topic
+     * @param applied
+     * @return Subscription expiration time in minutes.
+     */
+    CompletableFuture<Integer> getSubscriptionExpirationTimeAsync(String 
topic, boolean applied);
+
+    /**
+     * Remove subscription expiration time for a topic.
+     *
+     * @param topic
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void removeSubscriptionExpirationTime(String topic) throws 
PulsarAdminException;
+
+    /**
+     * Remove subscription expiration time for a topic asynchronously.
+     *
+     * @param topic
+     */
+    CompletableFuture<Void> removeSubscriptionExpirationTimeAsync(String 
topic);
+
     /**
      * Set message TTL for a topic.
      *
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicPoliciesImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicPoliciesImpl.java
index 6cfa981f1c4..ca607fd63b3 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicPoliciesImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicPoliciesImpl.java
@@ -404,6 +404,63 @@ public class TopicPoliciesImpl extends BaseResource 
implements TopicPolicies {
         return asyncDeleteRequest(path);
     }
 
+    @Override
+    public void setSubscriptionExpirationTime(String topic, int 
subscriptionExpirationTimeInMinutes)
+            throws PulsarAdminException {
+        try {
+            TopicName topicName = validateTopic(topic);
+            WebTarget path = topicPath(topicName, 
"subscriptionExpirationTime");
+            request(path.queryParam("subscriptionExpirationTime", 
subscriptionExpirationTimeInMinutes))
+                    .post(Entity.entity("", MediaType.APPLICATION_JSON), 
ErrorData.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> setSubscriptionExpirationTimeAsync(String 
topic,
+                                                                      int 
subscriptionExpirationTimeInMinutes) {
+        TopicName topicName = validateTopic(topic);
+        WebTarget path = topicPath(topicName, "subscriptionExpirationTime");
+        path = path.queryParam("subscriptionExpirationTime", 
subscriptionExpirationTimeInMinutes);
+        return asyncPostRequest(path, Entity.entity("", 
MediaType.APPLICATION_JSON));
+    }
+
+    @Override
+    public Integer getSubscriptionExpirationTime(String topic) throws 
PulsarAdminException {
+        return getSubscriptionExpirationTime(topic, false);
+    }
+
+    @Override
+    public CompletableFuture<Integer> 
getSubscriptionExpirationTimeAsync(String topic) {
+        return getSubscriptionExpirationTimeAsync(topic, false);
+    }
+
+    @Override
+    public Integer getSubscriptionExpirationTime(String topic, boolean 
applied) throws PulsarAdminException {
+        return sync(() -> getSubscriptionExpirationTimeAsync(topic, applied));
+    }
+
+    @Override
+    public CompletableFuture<Integer> 
getSubscriptionExpirationTimeAsync(String topic, boolean applied) {
+        TopicName topicName = validateTopic(topic);
+        WebTarget path = topicPath(topicName, "subscriptionExpirationTime");
+        path = path.queryParam("applied", applied);
+        return asyncGetRequest(path, new FutureCallback<Integer>() {});
+    }
+
+    @Override
+    public void removeSubscriptionExpirationTime(String topic) throws 
PulsarAdminException {
+        sync(() -> removeSubscriptionExpirationTimeAsync(topic));
+    }
+
+    @Override
+    public CompletableFuture<Void> 
removeSubscriptionExpirationTimeAsync(String topic) {
+        TopicName topicName = validateTopic(topic);
+        WebTarget path = topicPath(topicName, "subscriptionExpirationTime");
+        return asyncDeleteRequest(path);
+    }
+
     @Override
     public void setMessageTTL(String topic, int messageTTLInSecond) throws 
PulsarAdminException {
         try {
diff --git 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index 675eca867a3..394fec1c8e9 100644
--- 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -1233,6 +1233,12 @@ public class PulsarAdminToolTest {
         
verify(mockTopicsPolicies).setMessageTTL("persistent://myprop/clust/ns1/ds1", 
10);
         cmdTopics.run(split("remove-message-ttl 
persistent://myprop/clust/ns1/ds1"));
         
verify(mockTopicsPolicies).removeMessageTTL("persistent://myprop/clust/ns1/ds1");
+        cmdTopics.run(split("get-subscription-expiration-time 
persistent://myprop/clust/ns1/ds1"));
+        
verify(mockTopicsPolicies).getSubscriptionExpirationTime("persistent://myprop/clust/ns1/ds1",
 false);
+        cmdTopics.run(split("set-subscription-expiration-time 
persistent://myprop/clust/ns1/ds1 -t 10"));
+        
verify(mockTopicsPolicies).setSubscriptionExpirationTime("persistent://myprop/clust/ns1/ds1",
 10);
+        cmdTopics.run(split("remove-subscription-expiration-time 
persistent://myprop/clust/ns1/ds1"));
+        
verify(mockTopicsPolicies).removeSubscriptionExpirationTime("persistent://myprop/clust/ns1/ds1");
 
         cmdTopics.run(split("get-max-consumers-per-subscription 
persistent://myprop/clust/ns1/ds1"));
         
verify(mockTopicsPolicies).getMaxConsumersPerSubscription("persistent://myprop/clust/ns1/ds1");
@@ -1377,6 +1383,12 @@ public class PulsarAdminToolTest {
         
verify(mockGlobalTopicsPolicies).setMessageTTL("persistent://myprop/clust/ns1/ds1",
 10);
         cmdTopics.run(split("remove-message-ttl 
persistent://myprop/clust/ns1/ds1 -g"));
         
verify(mockGlobalTopicsPolicies).removeMessageTTL("persistent://myprop/clust/ns1/ds1");
+        cmdTopics.run(split("get-subscription-expiration-time 
persistent://myprop/clust/ns1/ds1 -g"));
+        
verify(mockGlobalTopicsPolicies).getSubscriptionExpirationTime("persistent://myprop/clust/ns1/ds1",
 false);
+        cmdTopics.run(split("set-subscription-expiration-time 
persistent://myprop/clust/ns1/ds1 -t 10 -g"));
+        
verify(mockGlobalTopicsPolicies).setSubscriptionExpirationTime("persistent://myprop/clust/ns1/ds1",
 10);
+        cmdTopics.run(split("remove-subscription-expiration-time 
persistent://myprop/clust/ns1/ds1 -g"));
+        
verify(mockGlobalTopicsPolicies).removeSubscriptionExpirationTime("persistent://myprop/clust/ns1/ds1");
 
         cmdTopics.run(split("get-persistence persistent://myprop/clust/ns1/ds1 
-g"));
         
verify(mockGlobalTopicsPolicies).getPersistence("persistent://myprop/clust/ns1/ds1");
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
index 73032791e2c..b386f4aacb1 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
@@ -64,6 +64,9 @@ public class CmdTopicPolicies extends CmdBase {
         addCommand("get-message-ttl", new GetMessageTTL());
         addCommand("set-message-ttl", new SetMessageTTL());
         addCommand("remove-message-ttl", new RemoveMessageTTL());
+        addCommand("get-subscription-expiration-time", new 
GetSubscriptionExpirationTime());
+        addCommand("set-subscription-expiration-time", new 
SetSubscriptionExpirationTime());
+        addCommand("remove-subscription-expiration-time", new 
RemoveSubscriptionExpirationTime());
 
         addCommand("get-max-unacked-messages-per-consumer", new 
GetMaxUnackedMessagesPerConsumer());
         addCommand("set-max-unacked-messages-per-consumer", new 
SetMaxUnackedMessagesPerConsumer());
@@ -395,6 +398,61 @@ public class CmdTopicPolicies extends CmdBase {
         }
     }
 
+    @Command(description = "Get subscription expiration time in minutes for a 
topic")
+    private class GetSubscriptionExpirationTime extends CliCommand {
+        @Parameters(description = "persistent://tenant/namespace/topic", arity 
= "1")
+        private String topicName;
+
+        @Option(names = { "-ap", "--applied" }, description = "Get the applied 
policy of the topic")
+        private boolean applied = false;
+
+        @Option(names = { "--global", "-g" }, description = "Whether to get 
this policy globally. "
+                + "If set to true, broker returned global topic policies")
+        private boolean isGlobal = false;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String persistentTopic = validatePersistentTopic(topicName);
+            
print(getTopicPolicies(isGlobal).getSubscriptionExpirationTime(persistentTopic, 
applied));
+        }
+    }
+
+    @Command(description = "Set subscription expiration time in minutes for a 
topic")
+    private class SetSubscriptionExpirationTime extends CliCommand {
+        @Parameters(description = "persistent://tenant/namespace/topic", arity 
= "1")
+        private String topicName;
+
+        @Option(names = { "-t", "--time" }, description = "Subscription 
expiration time in minutes", required = true)
+        private int subscriptionExpirationTimeInMinutes;
+
+        @Option(names = { "--global", "-g" }, description = "Whether to set 
this policy globally. "
+                + "If set to true, broker returned global topic policies")
+        private boolean isGlobal = false;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String persistentTopic = validatePersistentTopic(topicName);
+            getTopicPolicies(isGlobal)
+                    .setSubscriptionExpirationTime(persistentTopic, 
subscriptionExpirationTimeInMinutes);
+        }
+    }
+
+    @Command(description = "Remove subscription expiration time for a topic")
+    private class RemoveSubscriptionExpirationTime extends CliCommand {
+        @Parameters(description = "persistent://tenant/namespace/topic", arity 
= "1")
+        private String topicName;
+
+        @Option(names = { "--global", "-g" }, description = "Whether to remove 
this policy globally. "
+                + "If set to true, broker returned global topic policies")
+        private boolean isGlobal = false;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String persistentTopic = validatePersistentTopic(topicName);
+            
getTopicPolicies(isGlobal).removeSubscriptionExpirationTime(persistentTopic);
+        }
+    }
+
     @Command(description = "Set subscription types enabled for a topic")
     private class SetSubscriptionTypesEnabled extends CliCommand {
         @Parameters(description = "persistent://tenant/namespace/topic", arity 
= "1")
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java
index 4edb033498b..b476c9d2900 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java
@@ -45,6 +45,7 @@ public class HierarchyTopicPolicies {
     final Map<BacklogQuotaType, PolicyHierarchyValue<BacklogQuota>> 
backLogQuotaMap;
     final PolicyHierarchyValue<Integer> topicMaxMessageSize;
     final PolicyHierarchyValue<Integer> messageTTLInSeconds;
+    final PolicyHierarchyValue<Integer> subscriptionExpirationTimeInMinutes;
     final PolicyHierarchyValue<Long> compactionThreshold;
     final PolicyHierarchyValue<Integer> maxConsumerPerTopic;
     final PolicyHierarchyValue<PublishRate> publishRate;
@@ -81,6 +82,7 @@ public class HierarchyTopicPolicies {
                 .build();
         topicMaxMessageSize = new PolicyHierarchyValue<>();
         messageTTLInSeconds = new PolicyHierarchyValue<>();
+        subscriptionExpirationTimeInMinutes = new PolicyHierarchyValue<>();
         publishRate = new PolicyHierarchyValue<>();
         delayedDeliveryEnabled = new PolicyHierarchyValue<>();
         dispatcherPauseOnAckStatePersistentEnabled = new 
PolicyHierarchyValue<>();
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
index 842c67714d5..c9b5e1a3e1e 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
@@ -56,6 +56,7 @@ public class TopicPolicies implements Cloneable {
     private RetentionPolicies retentionPolicies;
     private Boolean deduplicationEnabled;
     private Integer messageTTLInSeconds;
+    private Integer subscriptionExpirationTimeInMinutes;
     private Integer maxProducerPerTopic;
     private Integer maxConsumerPerTopic;
     private Integer maxConsumersPerSubscription;
@@ -222,6 +223,10 @@ public class TopicPolicies implements Cloneable {
         return messageTTLInSeconds != null;
     }
 
+    public boolean isSubscriptionExpirationTimeInMinutesSet() {
+        return subscriptionExpirationTimeInMinutes != null;
+    }
+
     public boolean isMaxProducerPerTopicSet() {
         return maxProducerPerTopic != null;
     }


Reply via email to