This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new c27ec7d2c32 [feat][broker] add topic-level subscription expiration
time policy (#25258)
c27ec7d2c32 is described below
commit c27ec7d2c32275df3f221e6cefeb4d3f01551b76
Author: Penghui Li <[email protected]>
AuthorDate: Thu Feb 26 04:16:56 2026 -0800
[feat][broker] add topic-level subscription expiration time policy (#25258)
---
.../broker/admin/impl/PersistentTopicsBase.java | 31 ++++++
.../pulsar/broker/admin/v2/PersistentTopics.java | 84 ++++++++++++++++
.../pulsar/broker/service/AbstractTopic.java | 6 ++
.../broker/service/persistent/PersistentTopic.java | 18 +---
.../broker/admin/TopicPoliciesAuthZTest.java | 84 ++++++++++++++++
.../broker/admin/TopicPoliciesDisableTest.java | 27 +++++
.../pulsar/broker/admin/TopicPoliciesTest.java | 112 +++++++++++++++++++++
.../pulsar/broker/service/PersistentTopicTest.java | 3 +-
.../apache/pulsar/client/admin/TopicPolicies.java | 77 ++++++++++++++
.../client/admin/internal/TopicPoliciesImpl.java | 57 +++++++++++
.../pulsar/admin/cli/PulsarAdminToolTest.java | 12 +++
.../apache/pulsar/admin/cli/CmdTopicPolicies.java | 58 +++++++++++
.../policies/data/HierarchyTopicPolicies.java | 2 +
.../pulsar/common/policies/data/TopicPolicies.java | 5 +
14 files changed, 558 insertions(+), 18 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 99c0b90d45d..76b3247c599 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -3481,6 +3481,37 @@ public class PersistentTopicsBase extends AdminResource {
});
}
+ protected CompletableFuture<Integer>
internalGetSubscriptionExpirationTime(boolean applied, boolean isGlobal) {
+ return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
+ .thenApply(op ->
op.map(TopicPolicies::getSubscriptionExpirationTimeInMinutes)
+ .orElseGet(() -> {
+ if (applied) {
+ Integer namespacePolicy =
getNamespacePolicies(namespaceName)
+ .subscription_expiration_time_minutes;
+ return namespacePolicy == null
+ ?
config().getSubscriptionExpirationTimeMinutes()
+ : namespacePolicy;
+ }
+ return null;
+ }));
+ }
+
+ protected CompletableFuture<Void>
internalSetSubscriptionExpirationTime(Integer expirationTimeToSet,
+
boolean isGlobal) {
+ if (expirationTimeToSet != null && expirationTimeToSet < 0) {
+ return FutureUtil.failedFuture(new
RestException(Status.PRECONDITION_FAILED,
+ "Invalid value for subscription expiration time"));
+ }
+
+ return pulsar().getTopicPoliciesService()
+ .updateTopicPoliciesAsync(topicName, isGlobal,
expirationTimeToSet == null, policies -> {
+
policies.setSubscriptionExpirationTimeInMinutes(expirationTimeToSet);
+ log.info("[{}] Successfully set topic subscription
expiration time: namespace={}, "
+ + "topic={}, time={}",
+ clientAppId(), namespaceName,
topicName.getLocalName(), expirationTimeToSet);
+ });
+ }
+
protected CompletableFuture<Void> internalSetMessageTTL(Integer
ttlInSecondToSet, boolean isGlobal) {
//Validate message ttl value.
if (ttlInSecondToSet != null && ttlInSecondToSet < 0) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index fc34164b5cf..3d54d73ff45 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -2454,6 +2454,90 @@ public class PersistentTopics extends
PersistentTopicsBase {
});
}
+ @GET
+ @Path("/{tenant}/{namespace}/{topic}/subscriptionExpirationTime")
+ @ApiOperation(value = "Get subscription expiration time in minutes for a
topic", response = Integer.class)
+ @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have
admin permission"),
+ @ApiResponse(code = 404, message = "Namespace or topic doesn't
exist"),
+ @ApiResponse(code = 405, message =
+ "Topic level policy is disabled, enable the topic level
policy and retry")})
+ public void getSubscriptionExpirationTime(@Suspended final AsyncResponse
asyncResponse,
+ @PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace,
+ @PathParam("topic") @Encoded String encodedTopic,
+ @QueryParam("applied") @DefaultValue("false") boolean applied,
+ @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
+ @ApiParam(value = "Whether leader broker redirected this call to
this broker. For internal use.")
+ @QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
+ validateTopicName(tenant, namespace, encodedTopic);
+ validateTopicPolicyOperationAsync(topicName,
PolicyName.SUBSCRIPTION_EXPIRATION_TIME, PolicyOperation.READ)
+ .thenCompose(__ -> preValidation(authoritative))
+ .thenCompose(__ ->
internalGetSubscriptionExpirationTime(applied, isGlobal))
+ .thenAccept(asyncResponse::resume)
+ .exceptionally(ex -> {
+
handleTopicPolicyException("getSubscriptionExpirationTime", ex, asyncResponse);
+ return null;
+ });
+ }
+
+ @POST
+ @Path("/{tenant}/{namespace}/{topic}/subscriptionExpirationTime")
+ @ApiOperation(value = "Set subscription expiration time in minutes for a
topic")
+ @ApiResponses(value = {
+ @ApiResponse(code = 204, message = "Operation successful"),
+ @ApiResponse(code = 403, message = "Don't have admin permission"),
+ @ApiResponse(code = 404, message = "Namespace or topic doesn't
exist"),
+ @ApiResponse(code = 405, message =
+ "Topic level policy is disabled, enable the topic level
policy and retry"),
+ @ApiResponse(code = 412, message = "Invalid subscription
expiration time value")})
+ public void setSubscriptionExpirationTime(@Suspended final AsyncResponse
asyncResponse,
+ @PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace,
+ @PathParam("topic") @Encoded String encodedTopic,
+ @ApiParam(value = "Subscription expiration time in minutes",
required = true)
+ @QueryParam("subscriptionExpirationTime") Integer
subscriptionExpirationTime,
+ @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
+ @ApiParam(value = "Whether leader broker redirected this call to
this broker. For internal use.")
+ @QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
+ validateTopicName(tenant, namespace, encodedTopic);
+ validateTopicPolicyOperationAsync(topicName,
PolicyName.SUBSCRIPTION_EXPIRATION_TIME, PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
+ .thenCompose(__ ->
internalSetSubscriptionExpirationTime(subscriptionExpirationTime, isGlobal))
+ .thenRun(() ->
asyncResponse.resume(Response.noContent().build()))
+ .exceptionally(ex -> {
+
handleTopicPolicyException("setSubscriptionExpirationTime", ex, asyncResponse);
+ return null;
+ });
+ }
+
+ @DELETE
+ @Path("/{tenant}/{namespace}/{topic}/subscriptionExpirationTime")
+ @ApiOperation(value = "Remove subscription expiration time for a topic")
+ @ApiResponses(value = {
+ @ApiResponse(code = 204, message = "Operation successful"),
+ @ApiResponse(code = 403, message = "Don't have admin permission"),
+ @ApiResponse(code = 404, message = "Namespace or topic doesn't
exist"),
+ @ApiResponse(code = 405, message =
+ "Topic level policy is disabled, enable the topic level
policy and retry"),
+ @ApiResponse(code = 412, message = "Invalid subscription
expiration time value")})
+ public void removeSubscriptionExpirationTime(@Suspended final
AsyncResponse asyncResponse,
+ @PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace,
+ @PathParam("topic") @Encoded String encodedTopic,
+ @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
+ @ApiParam(value = "Whether leader broker redirected this call to
this broker. For internal use.")
+ @QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
+ validateTopicName(tenant, namespace, encodedTopic);
+ validateTopicPolicyOperationAsync(topicName,
PolicyName.SUBSCRIPTION_EXPIRATION_TIME, PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
+ .thenCompose(__ -> internalSetSubscriptionExpirationTime(null,
isGlobal))
+ .thenRun(() ->
asyncResponse.resume(Response.noContent().build()))
+ .exceptionally(ex -> {
+
handleTopicPolicyException("removeSubscriptionExpirationTime", ex,
asyncResponse);
+ return null;
+ });
+ }
+
@GET
@Path("/{tenant}/{namespace}/{topic}/messageTTL")
@ApiOperation(value = "Get message TTL in seconds for a topic", response =
Integer.class)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 0ac5ae999b0..c6a4c44398d 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -281,6 +281,8 @@ public abstract class AbstractTopic implements Topic,
TopicPolicyListener {
isGlobalPolicies);
topicPolicies.getMessageTTLInSeconds().updateTopicValue(normalizeValue(data.getMessageTTLInSeconds()),
isGlobalPolicies);
+ topicPolicies.getSubscriptionExpirationTimeInMinutes()
+
.updateTopicValue(normalizeValue(data.getSubscriptionExpirationTimeInMinutes()),
isGlobalPolicies);
topicPolicies.getPublishRate().updateTopicValue(PublishRate.normalize(data.getPublishRate()),
isGlobalPolicies);
topicPolicies.getDelayedDeliveryEnabled().updateTopicValue(data.getDelayedDeliveryEnabled(),
isGlobalPolicies);
topicPolicies.getReplicatorDispatchRate().updateTopicValue(
@@ -324,6 +326,8 @@ public abstract class AbstractTopic implements Topic,
TopicPolicyListener {
.updateNamespaceValue(normalizeValue(namespacePolicies.max_unacked_messages_per_subscription));
topicPolicies.getMessageTTLInSeconds()
.updateNamespaceValue(normalizeValue(namespacePolicies.message_ttl_in_seconds));
+ topicPolicies.getSubscriptionExpirationTimeInMinutes()
+
.updateNamespaceValue(normalizeValue(namespacePolicies.subscription_expiration_time_minutes));
topicPolicies.getMaxSubscriptionsPerTopic()
.updateNamespaceValue(normalizeValue(namespacePolicies.max_subscriptions_per_topic));
topicPolicies.getMaxProducersPerTopic()
@@ -444,6 +448,8 @@ public abstract class AbstractTopic implements Topic,
TopicPolicyListener {
topicPolicies.getTopicMaxMessageSize().updateBrokerValue(config.getMaxMessageSize());
topicPolicies.getMessageTTLInSeconds().updateBrokerValue(config.getTtlDurationDefaultInSeconds());
+ topicPolicies.getSubscriptionExpirationTimeInMinutes()
+
.updateBrokerValue(config.getSubscriptionExpirationTimeMinutes());
topicPolicies.getPublishRate().updateBrokerValue(publishRateInBroker(config));
topicPolicies.getDelayedDeliveryEnabled().updateBrokerValue(config.isDelayedDeliveryEnabled());
topicPolicies.getDelayedDeliveryTickTimeMillis().updateBrokerValue(config.getDelayedDeliveryTickTimeMillis());
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 5a1e0e940a8..da4d717e798 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -3559,22 +3559,8 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
@Override
public void checkInactiveSubscriptions() {
- TopicName name = TopicName.get(topic);
- try {
- Policies policies =
brokerService.pulsar().getPulsarResources().getNamespaceResources()
- .getPolicies(name.getNamespaceObject())
- .orElseThrow(() -> new
MetadataStoreException.NotFoundException());
- final int defaultExpirationTime =
brokerService.pulsar().getConfiguration()
- .getSubscriptionExpirationTimeMinutes();
- final Integer nsExpirationTime =
policies.subscription_expiration_time_minutes;
- final long expirationTimeMillis = TimeUnit.MINUTES
- .toMillis(nsExpirationTime == null ? defaultExpirationTime
: nsExpirationTime);
- checkInactiveSubscriptions(expirationTimeMillis);
- } catch (Exception e) {
- if (log.isDebugEnabled()) {
- log.debug("[{}] Error getting policies", topic);
- }
- }
+ Integer expirationTime =
topicPolicies.getSubscriptionExpirationTimeInMinutes().get();
+ checkInactiveSubscriptions(TimeUnit.MINUTES.toMillis(expirationTime));
}
@VisibleForTesting
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesAuthZTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesAuthZTest.java
index b758e878507..b15ecbe9600 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesAuthZTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesAuthZTest.java
@@ -492,4 +492,88 @@ public final class TopicPoliciesAuthZTest extends
MockedPulsarStandalone {
}
}
+
+ @SneakyThrows
+ @Test
+ public void testSubscriptionExpirationTime() {
+ final String random = UUID.randomUUID().toString();
+ final String topic = "persistent://public/default/" + random;
+ final String subject = UUID.randomUUID().toString();
+ final String token = Jwts.builder()
+ .claim("sub", subject).signWith(SECRET_KEY).compact();
+ superUserAdmin.topics().createNonPartitionedTopic(topic);
+
+ @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
+ .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+ .authentication(new AuthenticationToken(token))
+ .build();
+
+ int expirationTimeInMinutes = 10;
+
+ // test superuser
+ superUserAdmin.topicPolicies().setSubscriptionExpirationTime(topic,
expirationTimeInMinutes);
+ await().untilAsserted(() -> Assert.assertEquals(
+
superUserAdmin.topicPolicies().getSubscriptionExpirationTime(topic).intValue(),
+ expirationTimeInMinutes));
+ superUserAdmin.topicPolicies().removeSubscriptionExpirationTime(topic);
+ await().untilAsserted(() ->
+
Assert.assertNull(superUserAdmin.topicPolicies().getSubscriptionExpirationTime(topic)));
+
+ // test tenant manager
+
tenantManagerAdmin.topicPolicies().setSubscriptionExpirationTime(topic,
expirationTimeInMinutes);
+ await().untilAsserted(() -> Assert.assertEquals(
+
tenantManagerAdmin.topicPolicies().getSubscriptionExpirationTime(topic).intValue(),
+ expirationTimeInMinutes));
+
tenantManagerAdmin.topicPolicies().removeSubscriptionExpirationTime(topic);
+ await().untilAsserted(() ->
+
Assert.assertNull(tenantManagerAdmin.topicPolicies().getSubscriptionExpirationTime(topic)));
+
+ // test nobody
+ try {
+ subAdmin.topicPolicies().getSubscriptionExpirationTime(topic);
+ Assert.fail("unexpected behaviour");
+ } catch (PulsarAdminException ex) {
+ Assert.assertTrue(ex instanceof
PulsarAdminException.NotAuthorizedException);
+ }
+
+ try {
+ subAdmin.topicPolicies().setSubscriptionExpirationTime(topic,
expirationTimeInMinutes);
+ Assert.fail("unexpected behaviour");
+ } catch (PulsarAdminException ex) {
+ Assert.assertTrue(ex instanceof
PulsarAdminException.NotAuthorizedException);
+ }
+
+ try {
+ subAdmin.topicPolicies().removeSubscriptionExpirationTime(topic);
+ Assert.fail("unexpected behaviour");
+ } catch (PulsarAdminException ex) {
+ Assert.assertTrue(ex instanceof
PulsarAdminException.NotAuthorizedException);
+ }
+
+ // test sub user with permissions
+ for (AuthAction action : AuthAction.values()) {
+
superUserAdmin.namespaces().grantPermissionOnNamespace("public/default",
subject, Set.of(action));
+ try {
+ subAdmin.topicPolicies().getSubscriptionExpirationTime(topic);
+ Assert.fail("unexpected behaviour");
+ } catch (PulsarAdminException ex) {
+ Assert.assertTrue(ex instanceof
PulsarAdminException.NotAuthorizedException);
+ }
+
+ try {
+ subAdmin.topicPolicies().setSubscriptionExpirationTime(topic,
expirationTimeInMinutes);
+ Assert.fail("unexpected behaviour");
+ } catch (PulsarAdminException ex) {
+ Assert.assertTrue(ex instanceof
PulsarAdminException.NotAuthorizedException);
+ }
+
+ try {
+
subAdmin.topicPolicies().removeSubscriptionExpirationTime(topic);
+ Assert.fail("unexpected behaviour");
+ } catch (PulsarAdminException ex) {
+ Assert.assertTrue(ex instanceof
PulsarAdminException.NotAuthorizedException);
+ }
+
superUserAdmin.namespaces().revokePermissionsOnNamespace("public/default",
subject);
+ }
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesDisableTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesDisableTest.java
index 29c659a23da..98465e233b8 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesDisableTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesDisableTest.java
@@ -228,6 +228,33 @@ public class TopicPoliciesDisableTest extends
MockedPulsarServiceBaseTest {
}
}
+ @Test
+ public void testSubscriptionExpirationTimeDisabled() throws Exception {
+ int subscriptionExpirationTime = 10;
+ log.info("SubscriptionExpirationTime: {} will set to the topic: {}",
subscriptionExpirationTime, testTopic);
+
+ try {
+ admin.topicPolicies().setSubscriptionExpirationTime(testTopic,
subscriptionExpirationTime);
+ Assert.fail();
+ } catch (PulsarAdminException e) {
+ Assert.assertEquals(e.getStatusCode(),
HttpStatus.METHOD_NOT_ALLOWED_405);
+ }
+
+ try {
+ admin.topicPolicies().getSubscriptionExpirationTime(testTopic);
+ Assert.fail();
+ } catch (PulsarAdminException e) {
+ Assert.assertEquals(e.getStatusCode(),
HttpStatus.METHOD_NOT_ALLOWED_405);
+ }
+
+ try {
+ admin.topicPolicies().removeSubscriptionExpirationTime(testTopic);
+ Assert.fail();
+ } catch (PulsarAdminException e) {
+ Assert.assertEquals(e.getStatusCode(),
HttpStatus.METHOD_NOT_ALLOWED_405);
+ }
+ }
+
@Test
public void testPublishRateDisabled() throws Exception {
PublishRate publishRate = new PublishRate(10000, 1024 * 1024 * 5);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index 3be7acad770..86d92e421f7 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -2044,6 +2044,118 @@ public class TopicPoliciesTest extends
MockedPulsarServiceBaseTest {
admin.topics().deletePartitionedTopic(testTopic, true);
}
+ @Test
+ public void testGetSetRemoveSubscriptionExpirationTime() throws Exception {
+ final String topic = testTopic + UUID.randomUUID();
+ admin.topics().createNonPartitionedTopic(topic);
+ int expirationTimeInMinutes = 10;
+
+ admin.topicPolicies().setSubscriptionExpirationTime(topic,
expirationTimeInMinutes);
+ Awaitility.await().untilAsserted(() -> Assert.assertEquals(
+
admin.topicPolicies().getSubscriptionExpirationTime(topic).intValue(),
+ expirationTimeInMinutes));
+
+ admin.topicPolicies().removeSubscriptionExpirationTime(topic);
+ Awaitility.await().untilAsserted(() ->
+
Assert.assertNull(admin.topicPolicies().getSubscriptionExpirationTime(topic)));
+
+ try {
+ admin.topicPolicies().setSubscriptionExpirationTime(topic, -1);
+ fail("Setting negative subscription expiration time should fail");
+ } catch (PulsarAdminException e) {
+ Assert.assertEquals(e.getStatusCode(), 412);
+ }
+
+ admin.topics().delete(topic, true);
+ }
+
+ @Test
+ public void testSubscriptionExpirationTimeAppliedAndHierarchy() throws
Exception {
+ final String topic = testTopic + UUID.randomUUID();
+ admin.topics().createNonPartitionedTopic(topic);
+
+ int brokerDefault = conf.getSubscriptionExpirationTimeMinutes();
+
Assert.assertNull(admin.topicPolicies().getSubscriptionExpirationTime(topic));
+
Assert.assertNull(admin.namespaces().getSubscriptionExpirationTime(myNamespace));
+
Assert.assertEquals(admin.topicPolicies().getSubscriptionExpirationTime(topic,
true).intValue(), brokerDefault);
+ waitTopicPoliciesApplied(topic, 0, hierarchyTopicPolicies ->
assertEquals(
+
hierarchyTopicPolicies.getSubscriptionExpirationTimeInMinutes().get(),
Integer.valueOf(brokerDefault)));
+
+ admin.namespaces().setSubscriptionExpirationTime(myNamespace, 11);
+ Awaitility.await().untilAsserted(() ->
+
assertEquals(admin.namespaces().getSubscriptionExpirationTime(myNamespace).intValue(),
11));
+ Awaitility.await().untilAsserted(() ->
+
assertEquals(admin.topicPolicies().getSubscriptionExpirationTime(topic,
true).intValue(), 11));
+ waitTopicPoliciesApplied(topic, 0, hierarchyTopicPolicies ->
assertEquals(
+
hierarchyTopicPolicies.getSubscriptionExpirationTimeInMinutes().get(),
Integer.valueOf(11)));
+
+ admin.topicPolicies().setSubscriptionExpirationTime(topic, 22);
+ Awaitility.await().untilAsserted(() ->
+
assertEquals(admin.topicPolicies().getSubscriptionExpirationTime(topic).intValue(),
22));
+ Awaitility.await().untilAsserted(() ->
+
assertEquals(admin.topicPolicies().getSubscriptionExpirationTime(topic,
true).intValue(), 22));
+ waitTopicPoliciesApplied(topic, 0, hierarchyTopicPolicies ->
assertEquals(
+
hierarchyTopicPolicies.getSubscriptionExpirationTimeInMinutes().get(),
Integer.valueOf(22)));
+
+ admin.topicPolicies().removeSubscriptionExpirationTime(topic);
+ Awaitility.await().untilAsserted(() ->
+
assertNull(admin.topicPolicies().getSubscriptionExpirationTime(topic)));
+ Awaitility.await().untilAsserted(() ->
+
assertEquals(admin.topicPolicies().getSubscriptionExpirationTime(topic,
true).intValue(), 11));
+ waitTopicPoliciesApplied(topic, 0, hierarchyTopicPolicies ->
assertEquals(
+
hierarchyTopicPolicies.getSubscriptionExpirationTimeInMinutes().get(),
Integer.valueOf(11)));
+
+ admin.namespaces().removeSubscriptionExpirationTime(myNamespace);
+ Awaitility.await().untilAsserted(() ->
+
assertNull(admin.namespaces().getSubscriptionExpirationTime(myNamespace)));
+ Awaitility.await().untilAsserted(() -> assertEquals(
+ admin.topicPolicies().getSubscriptionExpirationTime(topic,
true).intValue(), brokerDefault));
+ waitTopicPoliciesApplied(topic, 0, hierarchyTopicPolicies ->
assertEquals(
+
hierarchyTopicPolicies.getSubscriptionExpirationTimeInMinutes().get(),
Integer.valueOf(brokerDefault)));
+
+ admin.topics().delete(topic, true);
+ }
+
+ @Test
+ public void testSubscriptionExpirationTimeRuntimePrecedence() throws
Exception {
+ final String topic = testTopic + UUID.randomUUID();
+ final String subName = "subscription-expiration-sub";
+ admin.topics().createNonPartitionedTopic(topic);
+ admin.topics().createSubscription(topic, subName, MessageId.latest);
+
+ PersistentTopic persistentTopic = (PersistentTopic)
pulsar.getBrokerService()
+ .getTopicIfExists(topic).get().get();
+ ManagedCursorImpl cursor = (ManagedCursorImpl)
persistentTopic.getSubscription(subName).getCursor();
+ Field cursorLastActiveField =
ManagedCursorImpl.class.getDeclaredField("lastActive");
+ cursorLastActiveField.setAccessible(true);
+
+ admin.namespaces().setSubscriptionExpirationTime(myNamespace, 1);
+ waitTopicPoliciesApplied(topic, 0, hierarchyTopicPolicies ->
assertEquals(
+
hierarchyTopicPolicies.getSubscriptionExpirationTimeInMinutes().get(),
Integer.valueOf(1)));
+
+ cursorLastActiveField.set(cursor, 0L);
+ persistentTopic.checkInactiveSubscriptions();
+ Awaitility.await().untilAsserted(() ->
assertEquals(admin.topics().getSubscriptions(topic).size(), 0));
+
+ admin.topics().createSubscription(topic, subName, MessageId.latest);
+ admin.topicPolicies().setSubscriptionExpirationTime(topic, 0);
+ Awaitility.await().untilAsserted(() ->
+
assertEquals(admin.topicPolicies().getSubscriptionExpirationTime(topic).intValue(),
0));
+ waitTopicPoliciesApplied(topic, 0, hierarchyTopicPolicies ->
assertEquals(
+
hierarchyTopicPolicies.getSubscriptionExpirationTimeInMinutes().get(),
Integer.valueOf(0)));
+
+ persistentTopic = (PersistentTopic)
pulsar.getBrokerService().getTopicIfExists(topic).get().get();
+ cursor = (ManagedCursorImpl)
persistentTopic.getSubscription(subName).getCursor();
+ cursorLastActiveField.set(cursor, 0L);
+ persistentTopic.checkInactiveSubscriptions();
+ Awaitility.await().during(2, TimeUnit.SECONDS).atMost(5,
TimeUnit.SECONDS)
+ .untilAsserted(() ->
assertEquals(admin.topics().getSubscriptions(topic).size(), 1));
+
+ admin.topicPolicies().removeSubscriptionExpirationTime(topic);
+ admin.namespaces().removeSubscriptionExpirationTime(myNamespace);
+ admin.topics().delete(topic, true);
+ }
+
@Test
public void testGetSetPublishRate() throws Exception {
PublishRate publishRate = new PublishRate(10000, 1024 * 1024 * 5);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index c83f3750ff0..bd817de553b 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -2058,6 +2058,7 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
@Test
public void testCheckInactiveSubscriptions() throws Exception {
+ pulsarTestContext.getConfig().setSubscriptionExpirationTimeMinutes(5);
PersistentTopic topic = new PersistentTopic(successTopicName,
ledgerMock, brokerService);
final var subscriptions = new ConcurrentHashMap<String,
PersistentSubscription>();
@@ -2093,8 +2094,6 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
.createPolicies(TopicName.get(successTopicName).getNamespaceObject(),
new Policies());
- pulsarTestContext.getConfig().setSubscriptionExpirationTimeMinutes(5);
-
doReturn(System.currentTimeMillis() -
TimeUnit.MINUTES.toMillis(6)).when(cursorMock).getLastActive();
topic.checkInactiveSubscriptions();
diff --git
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/TopicPolicies.java
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/TopicPolicies.java
index 3e985dd7281..69ea350dd1f 100644
---
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/TopicPolicies.java
+++
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/TopicPolicies.java
@@ -165,6 +165,83 @@ public interface TopicPolicies {
*/
void removeDelayedDeliveryPolicy(String topic) throws PulsarAdminException;
+ /**
+ * Set subscription expiration time for a topic in minutes.
+ *
+ * @param topic
+ * Topic name
+ * @param subscriptionExpirationTimeInMinutes
+ * Subscription expiration time in minutes.
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ void setSubscriptionExpirationTime(String topic, int
subscriptionExpirationTimeInMinutes)
+ throws PulsarAdminException;
+
+ /**
+ * Set subscription expiration time for a topic in minutes asynchronously.
+ *
+ * @param topic
+ * Topic name
+ * @param subscriptionExpirationTimeInMinutes
+ * Subscription expiration time in minutes.
+ */
+ CompletableFuture<Void> setSubscriptionExpirationTimeAsync(String topic,
int subscriptionExpirationTimeInMinutes);
+
+ /**
+ * Get subscription expiration time for a topic.
+ *
+ * @param topic
+ * @return Subscription expiration time in minutes.
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ Integer getSubscriptionExpirationTime(String topic) throws
PulsarAdminException;
+
+ /**
+ * Get subscription expiration time for a topic asynchronously.
+ *
+ * @param topic
+ * @return Subscription expiration time in minutes.
+ */
+ CompletableFuture<Integer> getSubscriptionExpirationTimeAsync(String
topic);
+
+ /**
+ * Get applied subscription expiration time for a topic.
+ *
+ * @param topic
+ * @param applied
+ * @return Subscription expiration time in minutes.
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ Integer getSubscriptionExpirationTime(String topic, boolean applied)
throws PulsarAdminException;
+
+ /**
+ * Get applied subscription expiration time for a topic asynchronously.
+ *
+ * @param topic
+ * @param applied
+ * @return Subscription expiration time in minutes.
+ */
+ CompletableFuture<Integer> getSubscriptionExpirationTimeAsync(String
topic, boolean applied);
+
+ /**
+ * Remove subscription expiration time for a topic.
+ *
+ * @param topic
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ void removeSubscriptionExpirationTime(String topic) throws
PulsarAdminException;
+
+ /**
+ * Remove subscription expiration time for a topic asynchronously.
+ *
+ * @param topic
+ */
+ CompletableFuture<Void> removeSubscriptionExpirationTimeAsync(String
topic);
+
/**
* Set message TTL for a topic.
*
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicPoliciesImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicPoliciesImpl.java
index 6cfa981f1c4..ca607fd63b3 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicPoliciesImpl.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicPoliciesImpl.java
@@ -404,6 +404,63 @@ public class TopicPoliciesImpl extends BaseResource
implements TopicPolicies {
return asyncDeleteRequest(path);
}
+ @Override
+ public void setSubscriptionExpirationTime(String topic, int
subscriptionExpirationTimeInMinutes)
+ throws PulsarAdminException {
+ try {
+ TopicName topicName = validateTopic(topic);
+ WebTarget path = topicPath(topicName,
"subscriptionExpirationTime");
+ request(path.queryParam("subscriptionExpirationTime",
subscriptionExpirationTimeInMinutes))
+ .post(Entity.entity("", MediaType.APPLICATION_JSON),
ErrorData.class);
+ } catch (Exception e) {
+ throw getApiException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> setSubscriptionExpirationTimeAsync(String
topic,
+ int
subscriptionExpirationTimeInMinutes) {
+ TopicName topicName = validateTopic(topic);
+ WebTarget path = topicPath(topicName, "subscriptionExpirationTime");
+ path = path.queryParam("subscriptionExpirationTime",
subscriptionExpirationTimeInMinutes);
+ return asyncPostRequest(path, Entity.entity("",
MediaType.APPLICATION_JSON));
+ }
+
+ @Override
+ public Integer getSubscriptionExpirationTime(String topic) throws
PulsarAdminException {
+ return getSubscriptionExpirationTime(topic, false);
+ }
+
+ @Override
+ public CompletableFuture<Integer>
getSubscriptionExpirationTimeAsync(String topic) {
+ return getSubscriptionExpirationTimeAsync(topic, false);
+ }
+
+ @Override
+ public Integer getSubscriptionExpirationTime(String topic, boolean
applied) throws PulsarAdminException {
+ return sync(() -> getSubscriptionExpirationTimeAsync(topic, applied));
+ }
+
+ @Override
+ public CompletableFuture<Integer>
getSubscriptionExpirationTimeAsync(String topic, boolean applied) {
+ TopicName topicName = validateTopic(topic);
+ WebTarget path = topicPath(topicName, "subscriptionExpirationTime");
+ path = path.queryParam("applied", applied);
+ return asyncGetRequest(path, new FutureCallback<Integer>() {});
+ }
+
+ @Override
+ public void removeSubscriptionExpirationTime(String topic) throws
PulsarAdminException {
+ sync(() -> removeSubscriptionExpirationTimeAsync(topic));
+ }
+
+ @Override
+ public CompletableFuture<Void>
removeSubscriptionExpirationTimeAsync(String topic) {
+ TopicName topicName = validateTopic(topic);
+ WebTarget path = topicPath(topicName, "subscriptionExpirationTime");
+ return asyncDeleteRequest(path);
+ }
+
@Override
public void setMessageTTL(String topic, int messageTTLInSecond) throws
PulsarAdminException {
try {
diff --git
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index 675eca867a3..394fec1c8e9 100644
---
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -1233,6 +1233,12 @@ public class PulsarAdminToolTest {
verify(mockTopicsPolicies).setMessageTTL("persistent://myprop/clust/ns1/ds1",
10);
cmdTopics.run(split("remove-message-ttl
persistent://myprop/clust/ns1/ds1"));
verify(mockTopicsPolicies).removeMessageTTL("persistent://myprop/clust/ns1/ds1");
+ cmdTopics.run(split("get-subscription-expiration-time
persistent://myprop/clust/ns1/ds1"));
+
verify(mockTopicsPolicies).getSubscriptionExpirationTime("persistent://myprop/clust/ns1/ds1",
false);
+ cmdTopics.run(split("set-subscription-expiration-time
persistent://myprop/clust/ns1/ds1 -t 10"));
+
verify(mockTopicsPolicies).setSubscriptionExpirationTime("persistent://myprop/clust/ns1/ds1",
10);
+ cmdTopics.run(split("remove-subscription-expiration-time
persistent://myprop/clust/ns1/ds1"));
+
verify(mockTopicsPolicies).removeSubscriptionExpirationTime("persistent://myprop/clust/ns1/ds1");
cmdTopics.run(split("get-max-consumers-per-subscription
persistent://myprop/clust/ns1/ds1"));
verify(mockTopicsPolicies).getMaxConsumersPerSubscription("persistent://myprop/clust/ns1/ds1");
@@ -1377,6 +1383,12 @@ public class PulsarAdminToolTest {
verify(mockGlobalTopicsPolicies).setMessageTTL("persistent://myprop/clust/ns1/ds1",
10);
cmdTopics.run(split("remove-message-ttl
persistent://myprop/clust/ns1/ds1 -g"));
verify(mockGlobalTopicsPolicies).removeMessageTTL("persistent://myprop/clust/ns1/ds1");
+ cmdTopics.run(split("get-subscription-expiration-time
persistent://myprop/clust/ns1/ds1 -g"));
+
verify(mockGlobalTopicsPolicies).getSubscriptionExpirationTime("persistent://myprop/clust/ns1/ds1",
false);
+ cmdTopics.run(split("set-subscription-expiration-time
persistent://myprop/clust/ns1/ds1 -t 10 -g"));
+
verify(mockGlobalTopicsPolicies).setSubscriptionExpirationTime("persistent://myprop/clust/ns1/ds1",
10);
+ cmdTopics.run(split("remove-subscription-expiration-time
persistent://myprop/clust/ns1/ds1 -g"));
+
verify(mockGlobalTopicsPolicies).removeSubscriptionExpirationTime("persistent://myprop/clust/ns1/ds1");
cmdTopics.run(split("get-persistence persistent://myprop/clust/ns1/ds1
-g"));
verify(mockGlobalTopicsPolicies).getPersistence("persistent://myprop/clust/ns1/ds1");
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
index 73032791e2c..b386f4aacb1 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
@@ -64,6 +64,9 @@ public class CmdTopicPolicies extends CmdBase {
addCommand("get-message-ttl", new GetMessageTTL());
addCommand("set-message-ttl", new SetMessageTTL());
addCommand("remove-message-ttl", new RemoveMessageTTL());
+ addCommand("get-subscription-expiration-time", new
GetSubscriptionExpirationTime());
+ addCommand("set-subscription-expiration-time", new
SetSubscriptionExpirationTime());
+ addCommand("remove-subscription-expiration-time", new
RemoveSubscriptionExpirationTime());
addCommand("get-max-unacked-messages-per-consumer", new
GetMaxUnackedMessagesPerConsumer());
addCommand("set-max-unacked-messages-per-consumer", new
SetMaxUnackedMessagesPerConsumer());
@@ -395,6 +398,61 @@ public class CmdTopicPolicies extends CmdBase {
}
}
+ @Command(description = "Get subscription expiration time in minutes for a
topic")
+ private class GetSubscriptionExpirationTime extends CliCommand {
+ @Parameters(description = "persistent://tenant/namespace/topic", arity
= "1")
+ private String topicName;
+
+ @Option(names = { "-ap", "--applied" }, description = "Get the applied
policy of the topic")
+ private boolean applied = false;
+
+ @Option(names = { "--global", "-g" }, description = "Whether to get
this policy globally. "
+ + "If set to true, broker returned global topic policies")
+ private boolean isGlobal = false;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String persistentTopic = validatePersistentTopic(topicName);
+
print(getTopicPolicies(isGlobal).getSubscriptionExpirationTime(persistentTopic,
applied));
+ }
+ }
+
+ @Command(description = "Set subscription expiration time in minutes for a
topic")
+ private class SetSubscriptionExpirationTime extends CliCommand {
+ @Parameters(description = "persistent://tenant/namespace/topic", arity
= "1")
+ private String topicName;
+
+ @Option(names = { "-t", "--time" }, description = "Subscription
expiration time in minutes", required = true)
+ private int subscriptionExpirationTimeInMinutes;
+
+ @Option(names = { "--global", "-g" }, description = "Whether to set
this policy globally. "
+ + "If set to true, broker returned global topic policies")
+ private boolean isGlobal = false;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String persistentTopic = validatePersistentTopic(topicName);
+ getTopicPolicies(isGlobal)
+ .setSubscriptionExpirationTime(persistentTopic,
subscriptionExpirationTimeInMinutes);
+ }
+ }
+
+ @Command(description = "Remove subscription expiration time for a topic")
+ private class RemoveSubscriptionExpirationTime extends CliCommand {
+ @Parameters(description = "persistent://tenant/namespace/topic", arity
= "1")
+ private String topicName;
+
+ @Option(names = { "--global", "-g" }, description = "Whether to remove
this policy globally. "
+ + "If set to true, broker returned global topic policies")
+ private boolean isGlobal = false;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String persistentTopic = validatePersistentTopic(topicName);
+
getTopicPolicies(isGlobal).removeSubscriptionExpirationTime(persistentTopic);
+ }
+ }
+
@Command(description = "Set subscription types enabled for a topic")
private class SetSubscriptionTypesEnabled extends CliCommand {
@Parameters(description = "persistent://tenant/namespace/topic", arity
= "1")
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java
index 4edb033498b..b476c9d2900 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java
@@ -45,6 +45,7 @@ public class HierarchyTopicPolicies {
final Map<BacklogQuotaType, PolicyHierarchyValue<BacklogQuota>>
backLogQuotaMap;
final PolicyHierarchyValue<Integer> topicMaxMessageSize;
final PolicyHierarchyValue<Integer> messageTTLInSeconds;
+ final PolicyHierarchyValue<Integer> subscriptionExpirationTimeInMinutes;
final PolicyHierarchyValue<Long> compactionThreshold;
final PolicyHierarchyValue<Integer> maxConsumerPerTopic;
final PolicyHierarchyValue<PublishRate> publishRate;
@@ -81,6 +82,7 @@ public class HierarchyTopicPolicies {
.build();
topicMaxMessageSize = new PolicyHierarchyValue<>();
messageTTLInSeconds = new PolicyHierarchyValue<>();
+ subscriptionExpirationTimeInMinutes = new PolicyHierarchyValue<>();
publishRate = new PolicyHierarchyValue<>();
delayedDeliveryEnabled = new PolicyHierarchyValue<>();
dispatcherPauseOnAckStatePersistentEnabled = new
PolicyHierarchyValue<>();
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
index 842c67714d5..c9b5e1a3e1e 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
@@ -56,6 +56,7 @@ public class TopicPolicies implements Cloneable {
private RetentionPolicies retentionPolicies;
private Boolean deduplicationEnabled;
private Integer messageTTLInSeconds;
+ private Integer subscriptionExpirationTimeInMinutes;
private Integer maxProducerPerTopic;
private Integer maxConsumerPerTopic;
private Integer maxConsumersPerSubscription;
@@ -222,6 +223,10 @@ public class TopicPolicies implements Cloneable {
return messageTTLInSeconds != null;
}
+ public boolean isSubscriptionExpirationTimeInMinutesSet() {
+ return subscriptionExpirationTimeInMinutes != null;
+ }
+
public boolean isMaxProducerPerTopicSet() {
return maxProducerPerTopic != null;
}