This is an automated email from the ASF dual-hosted git repository.
rfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 35646746 feat(namespace): add removal methods for namespace policies
(#1465)
35646746 is described below
commit 3564674615ac147075a299aafa8101ccbc2846cf
Author: Rui Fu <[email protected]>
AuthorDate: Wed Feb 25 13:44:07 2026 +0800
feat(namespace): add removal methods for namespace policies (#1465)
* feat(namespace): add removal methods for namespace configs
* docs(namespace): refine comment for RemoveNamespaceMessageTTLWithContext
---
pulsaradmin/pkg/admin/namespace.go | 98 +++++++++++++++++++++++++++++++++
pulsaradmin/pkg/admin/namespace_test.go | 84 ++++++++++++++++------------
2 files changed, 148 insertions(+), 34 deletions(-)
diff --git a/pulsaradmin/pkg/admin/namespace.go
b/pulsaradmin/pkg/admin/namespace.go
index 1d24b5c5..05afe719 100644
--- a/pulsaradmin/pkg/admin/namespace.go
+++ b/pulsaradmin/pkg/admin/namespace.go
@@ -94,6 +94,12 @@ type Namespaces interface {
// GetNamespaceMessageTTLWithContext returns the message TTL for a
namespace. Returns -1 if not set
GetNamespaceMessageTTLWithContext(ctx context.Context, namespace
string) (int, error)
+ // RemoveNamespaceMessageTTL removes the message TTL configuration for
a namespace, defaulting to broker settings
+ RemoveNamespaceMessageTTL(namespace string) error
+
+ // RemoveNamespaceMessageTTLWithContext removes the message TTL for a
namespace, defaulting to broker settings
+ RemoveNamespaceMessageTTLWithContext(ctx context.Context, namespace
string) error
+
// GetRetention returns the retention configuration for a namespace.
// Returns nil if the retention policy is not configured at the
namespace level.
GetRetention(namespace string) (*utils.RetentionPolicies, error)
@@ -108,6 +114,12 @@ type Namespaces interface {
// SetRetentionWithContext sets the retention configuration for all the
topics on a namespace
SetRetentionWithContext(ctx context.Context, namespace string, policy
utils.RetentionPolicies) error
+ // RemoveRetention removes the retention configuration for a namespace,
defaulting to broker settings
+ RemoveRetention(namespace string) error
+
+ // RemoveRetentionWithContext removes the retention configuration for a
namespace, defaulting to broker settings
+ RemoveRetentionWithContext(ctx context.Context, namespace string) error
+
// GetBacklogQuotaMap returns backlog quota map on a namespace
GetBacklogQuotaMap(namespace string)
(map[utils.BacklogQuotaType]utils.BacklogQuota, error)
@@ -288,6 +300,14 @@ type Namespaces interface {
// Returns -1 if not set
GetMaxConsumersPerSubscriptionWithContext(ctx context.Context,
namespace utils.NameSpaceName) (int, error)
+ // RemoveMaxConsumersPerSubscription removes
maxConsumersPerSubscription configuration for a namespace,
+ // defaulting to broker settings
+ RemoveMaxConsumersPerSubscription(namespace utils.NameSpaceName) error
+
+ // RemoveMaxConsumersPerSubscriptionWithContext removes
maxConsumersPerSubscription configuration for a namespace,
+ // defaulting to broker settings
+ RemoveMaxConsumersPerSubscriptionWithContext(ctx context.Context,
namespace utils.NameSpaceName) error
+
// SetMaxConsumersPerTopic sets maxConsumersPerTopic for a namespace.
//nolint: revive // It's ok here to use a built-in function name (max)
SetMaxConsumersPerTopic(namespace utils.NameSpaceName, max int) error
@@ -302,6 +322,14 @@ type Namespaces interface {
// GetMaxConsumersPerTopicWithContext returns the maxProducersPerTopic
for a namespace. Returns -1 if not set
GetMaxConsumersPerTopicWithContext(ctx context.Context, namespace
utils.NameSpaceName) (int, error)
+ // RemoveMaxConsumersPerTopic removes maxConsumersPerTopic
configuration for a namespace,
+ // defaulting to broker settings
+ RemoveMaxConsumersPerTopic(namespace utils.NameSpaceName) error
+
+ // RemoveMaxConsumersPerTopicWithContext removes maxConsumersPerTopic
configuration for a namespace,
+ // defaulting to broker settings
+ RemoveMaxConsumersPerTopicWithContext(ctx context.Context, namespace
utils.NameSpaceName) error
+
// SetMaxProducersPerTopic sets maxProducersPerTopic for a namespace.
//nolint: revive // It's ok here to use a built-in function name (max)
SetMaxProducersPerTopic(namespace utils.NameSpaceName, max int) error
@@ -316,6 +344,14 @@ type Namespaces interface {
// GetMaxProducersPerTopicWithContext returns the maxProducersPerTopic
for a namespace. Returns -1 if not set
GetMaxProducersPerTopicWithContext(ctx context.Context, namespace
utils.NameSpaceName) (int, error)
+ // RemoveMaxProducersPerTopic removes maxProducersPerTopic
configuration for a namespace,
+ // defaulting to broker settings
+ RemoveMaxProducersPerTopic(namespace utils.NameSpaceName) error
+
+ // RemoveMaxProducersPerTopicWithContext removes maxProducersPerTopic
configuration for a namespace,
+ // defaulting to broker settings
+ RemoveMaxProducersPerTopicWithContext(ctx context.Context, namespace
utils.NameSpaceName) error
+
// SetMaxTopicsPerNamespace sets maxTopicsPerNamespace for a namespace.
//nolint: revive // It's ok here to use a built-in function name (max)
SetMaxTopicsPerNamespace(namespace utils.NameSpaceName, max int) error
@@ -891,6 +927,19 @@ func (n *namespaces) SetNamespaceMessageTTLWithContext(ctx
context.Context, name
return n.pulsar.Client.PostWithContext(ctx, endpoint, &ttlInSeconds)
}
+func (n *namespaces) RemoveNamespaceMessageTTL(namespace string) error {
+ return n.RemoveNamespaceMessageTTLWithContext(context.Background(),
namespace)
+}
+
+func (n *namespaces) RemoveNamespaceMessageTTLWithContext(ctx context.Context,
namespace string) error {
+ nsName, err := utils.GetNamespaceName(namespace)
+ if err != nil {
+ return err
+ }
+ endpoint := n.pulsar.endpoint(n.basePath, nsName.String(), "messageTTL")
+ return n.pulsar.Client.DeleteWithContext(ctx, endpoint)
+}
+
func (n *namespaces) SetRetention(namespace string, policy
utils.RetentionPolicies) error {
return n.SetRetentionWithContext(context.Background(), namespace,
policy)
}
@@ -926,6 +975,19 @@ func (n *namespaces) GetRetentionWithContext(ctx
context.Context, namespace stri
return nil, err
}
+func (n *namespaces) RemoveRetention(namespace string) error {
+ return n.RemoveRetentionWithContext(context.Background(), namespace)
+}
+
+func (n *namespaces) RemoveRetentionWithContext(ctx context.Context, namespace
string) error {
+ nsName, err := utils.GetNamespaceName(namespace)
+ if err != nil {
+ return err
+ }
+ endpoint := n.pulsar.endpoint(n.basePath, nsName.String(), "retention")
+ return n.pulsar.Client.DeleteWithContext(ctx, endpoint)
+}
+
func (n *namespaces) GetBacklogQuotaMap(namespace string)
(map[utils.BacklogQuotaType]utils.BacklogQuota, error) {
return n.GetBacklogQuotaMapWithContext(context.Background(), namespace)
}
@@ -1171,6 +1233,18 @@ func (n *namespaces)
GetMaxConsumersPerSubscriptionWithContext(
return result, err
}
+func (n *namespaces) RemoveMaxConsumersPerSubscription(namespace
utils.NameSpaceName) error {
+ return
n.RemoveMaxConsumersPerSubscriptionWithContext(context.Background(), namespace)
+}
+
+func (n *namespaces) RemoveMaxConsumersPerSubscriptionWithContext(
+ ctx context.Context,
+ namespace utils.NameSpaceName,
+) error {
+ endpoint := n.pulsar.endpoint(n.basePath, namespace.String(),
"maxConsumersPerSubscription")
+ return n.pulsar.Client.DeleteWithContext(ctx, endpoint)
+}
+
func (n *namespaces) SetOffloadThreshold(namespace utils.NameSpaceName,
threshold int64) error {
return n.SetOffloadThresholdWithContext(context.Background(),
namespace, threshold)
}
@@ -1251,6 +1325,18 @@ func (n *namespaces) GetMaxConsumersPerTopicWithContext(
return result, err
}
+func (n *namespaces) RemoveMaxConsumersPerTopic(namespace utils.NameSpaceName)
error {
+ return n.RemoveMaxConsumersPerTopicWithContext(context.Background(),
namespace)
+}
+
+func (n *namespaces) RemoveMaxConsumersPerTopicWithContext(
+ ctx context.Context,
+ namespace utils.NameSpaceName,
+) error {
+ endpoint := n.pulsar.endpoint(n.basePath, namespace.String(),
"maxConsumersPerTopic")
+ return n.pulsar.Client.DeleteWithContext(ctx, endpoint)
+}
+
func (n *namespaces) SetCompactionThreshold(namespace utils.NameSpaceName,
threshold int64) error {
return n.SetCompactionThresholdWithContext(context.Background(),
namespace, threshold)
}
@@ -1307,6 +1393,18 @@ func (n *namespaces) GetMaxProducersPerTopicWithContext(
return result, err
}
+func (n *namespaces) RemoveMaxProducersPerTopic(namespace utils.NameSpaceName)
error {
+ return n.RemoveMaxProducersPerTopicWithContext(context.Background(),
namespace)
+}
+
+func (n *namespaces) RemoveMaxProducersPerTopicWithContext(
+ ctx context.Context,
+ namespace utils.NameSpaceName,
+) error {
+ endpoint := n.pulsar.endpoint(n.basePath, namespace.String(),
"maxProducersPerTopic")
+ return n.pulsar.Client.DeleteWithContext(ctx, endpoint)
+}
+
func (n *namespaces) GetNamespaceReplicationClusters(namespace string)
([]string, error) {
return
n.GetNamespaceReplicationClustersWithContext(context.Background(), namespace)
}
diff --git a/pulsaradmin/pkg/admin/namespace_test.go
b/pulsaradmin/pkg/admin/namespace_test.go
index 1796598e..f14027dd 100644
--- a/pulsaradmin/pkg/admin/namespace_test.go
+++ b/pulsaradmin/pkg/admin/namespace_test.go
@@ -610,28 +610,23 @@ func TestNamespaces_MessageTTL(t *testing.T) {
namespace, _ := utils.GetNamespaceName("public/default")
- // Get default (should be -1)
ttl, err :=
admin.Namespaces().GetNamespaceMessageTTL(namespace.String())
assert.NoError(t, err)
- assert.Equal(t, -1, ttl)
+ initialTTL := ttl
- // Set to 0 explicitly
- err = admin.Namespaces().SetNamespaceMessageTTL(namespace.String(), 0)
+ err = admin.Namespaces().SetNamespaceMessageTTL(namespace.String(),
3600)
assert.NoError(t, err)
- // Verify returns 0
ttl, err = admin.Namespaces().GetNamespaceMessageTTL(namespace.String())
assert.NoError(t, err)
- assert.Equal(t, 0, ttl)
+ assert.Equal(t, 3600, ttl)
- // Set to positive value
- err = admin.Namespaces().SetNamespaceMessageTTL(namespace.String(),
3600)
+ err = admin.Namespaces().RemoveNamespaceMessageTTL(namespace.String())
assert.NoError(t, err)
- // Verify returns value
ttl, err = admin.Namespaces().GetNamespaceMessageTTL(namespace.String())
assert.NoError(t, err)
- assert.Equal(t, 3600, ttl)
+ assert.Equal(t, initialTTL, ttl)
}
func TestNamespaces_OffloadDeleteLag(t *testing.T) {
@@ -674,28 +669,50 @@ func TestNamespaces_MaxConsumersPerTopic(t *testing.T) {
namespace, _ := utils.GetNamespaceName("public/default")
- // Get default (should be -1)
maxConsumers, err :=
admin.Namespaces().GetMaxConsumersPerTopic(*namespace)
assert.NoError(t, err)
- assert.Equal(t, -1, maxConsumers)
+ initialMaxConsumers := maxConsumers
- // Set to 0 explicitly
- err = admin.Namespaces().SetMaxConsumersPerTopic(*namespace, 0)
+ err = admin.Namespaces().SetMaxConsumersPerTopic(*namespace, 100)
assert.NoError(t, err)
- // Verify returns 0
maxConsumers, err =
admin.Namespaces().GetMaxConsumersPerTopic(*namespace)
assert.NoError(t, err)
- assert.Equal(t, 0, maxConsumers)
+ assert.Equal(t, 100, maxConsumers)
- // Set to positive value
- err = admin.Namespaces().SetMaxConsumersPerTopic(*namespace, 100)
+ err = admin.Namespaces().RemoveMaxConsumersPerTopic(*namespace)
assert.NoError(t, err)
- // Verify returns value
maxConsumers, err =
admin.Namespaces().GetMaxConsumersPerTopic(*namespace)
assert.NoError(t, err)
+ assert.Equal(t, initialMaxConsumers, maxConsumers)
+}
+
+func TestNamespaces_MaxConsumersPerSubscription(t *testing.T) {
+ config := &config.Config{}
+ admin, err := New(config)
+ require.NoError(t, err)
+ require.NotNil(t, admin)
+
+ namespace, _ := utils.GetNamespaceName("public/default")
+
+ maxConsumers, err :=
admin.Namespaces().GetMaxConsumersPerSubscription(*namespace)
+ assert.NoError(t, err)
+ initialMaxConsumers := maxConsumers
+
+ err = admin.Namespaces().SetMaxConsumersPerSubscription(*namespace, 100)
+ assert.NoError(t, err)
+
+ maxConsumers, err =
admin.Namespaces().GetMaxConsumersPerSubscription(*namespace)
+ assert.NoError(t, err)
assert.Equal(t, 100, maxConsumers)
+
+ err = admin.Namespaces().RemoveMaxConsumersPerSubscription(*namespace)
+ assert.NoError(t, err)
+
+ maxConsumers, err =
admin.Namespaces().GetMaxConsumersPerSubscription(*namespace)
+ assert.NoError(t, err)
+ assert.Equal(t, initialMaxConsumers, maxConsumers)
}
func TestNamespaces_CompactionThreshold(t *testing.T) {
@@ -738,28 +755,23 @@ func TestNamespaces_MaxProducersPerTopic(t *testing.T) {
namespace, _ := utils.GetNamespaceName("public/default")
- // Get default (should be -1)
maxProducers, err :=
admin.Namespaces().GetMaxProducersPerTopic(*namespace)
assert.NoError(t, err)
- assert.Equal(t, -1, maxProducers)
+ initialMaxProducers := maxProducers
- // Set to 0 explicitly
- err = admin.Namespaces().SetMaxProducersPerTopic(*namespace, 0)
+ err = admin.Namespaces().SetMaxProducersPerTopic(*namespace, 50)
assert.NoError(t, err)
- // Verify returns 0
maxProducers, err =
admin.Namespaces().GetMaxProducersPerTopic(*namespace)
assert.NoError(t, err)
- assert.Equal(t, 0, maxProducers)
+ assert.Equal(t, 50, maxProducers)
- // Set to positive value
- err = admin.Namespaces().SetMaxProducersPerTopic(*namespace, 50)
+ err = admin.Namespaces().RemoveMaxProducersPerTopic(*namespace)
assert.NoError(t, err)
- // Verify returns value
maxProducers, err =
admin.Namespaces().GetMaxProducersPerTopic(*namespace)
assert.NoError(t, err)
- assert.Equal(t, 50, maxProducers)
+ assert.Equal(t, initialMaxProducers, maxProducers)
}
func TestNamespaces_Retention(t *testing.T) {
@@ -770,12 +782,10 @@ func TestNamespaces_Retention(t *testing.T) {
namespaceName := "public/default"
- // Initial state: policy not configured, should return nil
retention, err := admin.Namespaces().GetRetention(namespaceName)
assert.NoError(t, err)
- assert.Nil(t, retention, "Expected nil when retention is not
configured")
+ initialRetention := retention
- // Set new retention policy
newRetention := utils.RetentionPolicies{
RetentionSizeInMB: 1024,
RetentionTimeInMinutes: 60,
@@ -783,12 +793,18 @@ func TestNamespaces_Retention(t *testing.T) {
err = admin.Namespaces().SetRetention(namespaceName, newRetention)
assert.NoError(t, err)
- // Verify retention is set
retention, err = admin.Namespaces().GetRetention(namespaceName)
assert.NoError(t, err)
- assert.NotNil(t, retention, "Expected non-nil when retention is
configured")
+ assert.NotNil(t, retention)
assert.Equal(t, int64(1024), retention.RetentionSizeInMB)
assert.Equal(t, 60, retention.RetentionTimeInMinutes)
+
+ err = admin.Namespaces().RemoveRetention(namespaceName)
+ assert.NoError(t, err)
+
+ retention, err = admin.Namespaces().GetRetention(namespaceName)
+ assert.NoError(t, err)
+ assert.Equal(t, initialRetention, retention)
}
func TestNamespaces_BookieAffinityGroup(t *testing.T) {