This is an automated email from the ASF dual-hosted git repository.
zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new c0db4828 Use -1 as sentinel value for namespace and topic admin
commands (#1430)
c0db4828 is described below
commit c0db48286e9e50f47fb0f6e9d5187dfe945fc5e9
Author: Kai <[email protected]>
AuthorDate: Thu Oct 23 00:47:57 2025 -0700
Use -1 as sentinel value for namespace and topic admin commands (#1430)
Fixes #1429
### Motivation
Use -1 as a sentinel value for all the namespace / topic admin get commands
that return an empty body to mean "unset"
### Modifications
-1 is the default return for the various get commands:
##### Namespace Admin Methods
- GetNamespaceMessageTTL / GetNamespaceMessageTTLWithContext
- GetMaxConsumersPerTopic / GetMaxConsumersPerTopicWithContext
- GetMaxProducersPerTopic / GetMaxProducersPerTopicWithContext
- GetMaxConsumersPerSubscription /
GetMaxConsumersPerSubscriptionWithContext
- GetOffloadThreshold / GetOffloadThresholdWithContext
- GetOffloadThresholdInSeconds / GetOffloadThresholdInSecondsWithContext
- GetOffloadDeleteLag / GetOffloadDeleteLagWithContext
- GetCompactionThreshold / GetCompactionThresholdWithContext
##### Topic Admin Methods
- GetMessageTTL / GetMessageTTLWithContext
- GetMaxProducers / GetMaxProducersWithContext
- GetMaxConsumers / GetMaxConsumersWithContext
- GetMaxUnackMessagesPerConsumer /
GetMaxUnackMessagesPerConsumerWithContext
- GetMaxUnackMessagesPerSubscription /
GetMaxUnackMessagesPerSubscriptionWithContext
- GetCompactionThreshold / GetCompactionThresholdWithContext
- GetMaxConsumersPerSubscription /
GetMaxConsumersPerSubscriptionWithContext
- GetMaxMessageSize / GetMaxMessageSizeWithContext
- GetMaxSubscriptionsPerTopic / GetMaxSubscriptionsPerTopicWithContext
- GetDeduplicationSnapshotInterval /
GetDeduplicationSnapshotIntervalWithContext
---
pulsaradmin/pkg/admin/namespace.go | 50 +++----
pulsaradmin/pkg/admin/namespace_test.go | 166 ++++++++++++++++++++++-
pulsaradmin/pkg/admin/topic.go | 64 ++++-----
pulsaradmin/pkg/admin/topic_test.go | 228 ++++++++++++++++++++++++++++++--
4 files changed, 443 insertions(+), 65 deletions(-)
diff --git a/pulsaradmin/pkg/admin/namespace.go
b/pulsaradmin/pkg/admin/namespace.go
index fdaa309b..6114f5ce 100644
--- a/pulsaradmin/pkg/admin/namespace.go
+++ b/pulsaradmin/pkg/admin/namespace.go
@@ -88,10 +88,10 @@ type Namespaces interface {
// SetNamespaceMessageTTLWithContext sets the messages Time to Live for
all the topics within a namespace
SetNamespaceMessageTTLWithContext(ctx context.Context, namespace
string, ttlInSeconds int) error
- // GetNamespaceMessageTTL returns the message TTL for a namespace
+ // GetNamespaceMessageTTL returns the message TTL for a namespace.
Returns -1 if not set
GetNamespaceMessageTTL(namespace string) (int, error)
- // GetNamespaceMessageTTLWithContext returns the message TTL for a
namespace
+ // GetNamespaceMessageTTLWithContext returns the message TTL for a
namespace. Returns -1 if not set
GetNamespaceMessageTTLWithContext(ctx context.Context, namespace
string) (int, error)
// GetRetention returns the retention configuration for a namespace
@@ -226,10 +226,11 @@ type Namespaces interface {
// SetOffloadDeleteLagWithContext sets the offload deletion lag for a
namespace
SetOffloadDeleteLagWithContext(ctx context.Context, namespace
utils.NameSpaceName, timeMs int64) error
- // GetOffloadDeleteLag returns the offload deletion lag for a
namespace, in milliseconds
+ // GetOffloadDeleteLag returns the offload deletion lag for a
namespace, in milliseconds. Returns -1 if not set
GetOffloadDeleteLag(namespace utils.NameSpaceName) (int64, error)
- // GetOffloadDeleteLagWithContext returns the offload deletion lag for
a namespace, in milliseconds
+ // GetOffloadDeleteLagWithContext returns the offload deletion lag for
a namespace, in milliseconds.
+ // Returns -1 if not set
GetOffloadDeleteLagWithContext(ctx context.Context, namespace
utils.NameSpaceName) (int64, error)
// SetOffloadThreshold sets the offloadThreshold for a namespace
@@ -238,10 +239,10 @@ type Namespaces interface {
// SetOffloadThresholdWithContext sets the offloadThreshold for a
namespace
SetOffloadThresholdWithContext(ctx context.Context, namespace
utils.NameSpaceName, threshold int64) error
- // GetOffloadThreshold returns the offloadThreshold for a namespace
+ // GetOffloadThreshold returns the offloadThreshold for a namespace.
Returns -1 if not set
GetOffloadThreshold(namespace utils.NameSpaceName) (int64, error)
- // GetOffloadThresholdWithContext returns the offloadThreshold for a
namespace
+ // GetOffloadThresholdWithContext returns the offloadThreshold for a
namespace. Returns -1 if not set
GetOffloadThresholdWithContext(ctx context.Context, namespace
utils.NameSpaceName) (int64, error)
// SetOffloadThresholdInSeconds sets the offloadThresholdInSeconds for
a namespace
@@ -250,10 +251,10 @@ type Namespaces interface {
// SetOffloadThresholdInSecondsWithContext sets the
offloadThresholdInSeconds for a namespace
SetOffloadThresholdInSecondsWithContext(ctx context.Context, namespace
utils.NameSpaceName, threshold int64) error
- // GetOffloadThresholdInSeconds returns the offloadThresholdInSeconds
for a namespace
+ // GetOffloadThresholdInSeconds returns the offloadThresholdInSeconds
for a namespace. Returns -1 if not set
GetOffloadThresholdInSeconds(namespace utils.NameSpaceName) (int64,
error)
- // GetOffloadThresholdInSecondsWithContext returns the
offloadThresholdInSeconds for a namespace
+ // GetOffloadThresholdInSecondsWithContext returns the
offloadThresholdInSeconds for a namespace. Returns -1 if not set
GetOffloadThresholdInSecondsWithContext(ctx context.Context, namespace
utils.NameSpaceName) (int64, error)
// SetCompactionThreshold sets the compactionThreshold for a namespace
@@ -262,10 +263,10 @@ type Namespaces interface {
// SetCompactionThresholdWithContext sets the compactionThreshold for a
namespace
SetCompactionThresholdWithContext(ctx context.Context, namespace
utils.NameSpaceName, threshold int64) error
- // GetCompactionThreshold returns the compactionThreshold for a
namespace
+ // GetCompactionThreshold returns the compactionThreshold for a
namespace. Returns -1 if not set
GetCompactionThreshold(namespace utils.NameSpaceName) (int64, error)
- // GetCompactionThresholdWithContext returns the compactionThreshold
for a namespace
+ // GetCompactionThresholdWithContext returns the compactionThreshold
for a namespace. Returns -1 if not set
GetCompactionThresholdWithContext(ctx context.Context, namespace
utils.NameSpaceName) (int64, error)
// SetMaxConsumersPerSubscription sets maxConsumersPerSubscription for
a namespace.
@@ -276,10 +277,11 @@ type Namespaces interface {
//nolint: revive // It's ok here to use a built-in function name (max)
SetMaxConsumersPerSubscriptionWithContext(ctx context.Context,
namespace utils.NameSpaceName, max int) error
- // GetMaxConsumersPerSubscription returns the
maxConsumersPerSubscription for a namespace.
+ // GetMaxConsumersPerSubscription returns the
maxConsumersPerSubscription for a namespace. Returns -1 if not set
GetMaxConsumersPerSubscription(namespace utils.NameSpaceName) (int,
error)
// GetMaxConsumersPerSubscriptionWithContext returns the
maxConsumersPerSubscription for a namespace.
+ // Returns -1 if not set
GetMaxConsumersPerSubscriptionWithContext(ctx context.Context,
namespace utils.NameSpaceName) (int, error)
// SetMaxConsumersPerTopic sets maxConsumersPerTopic for a namespace.
@@ -290,10 +292,10 @@ type Namespaces interface {
//nolint: revive // It's ok here to use a built-in function name (max)
SetMaxConsumersPerTopicWithContext(ctx context.Context, namespace
utils.NameSpaceName, max int) error
- // GetMaxConsumersPerTopic returns the maxProducersPerTopic for a
namespace.
+ // GetMaxConsumersPerTopic returns the maxProducersPerTopic for a
namespace. Returns -1 if not set
GetMaxConsumersPerTopic(namespace utils.NameSpaceName) (int, error)
- // GetMaxConsumersPerTopicWithContext returns the maxProducersPerTopic
for a namespace.
+ // GetMaxConsumersPerTopicWithContext returns the maxProducersPerTopic
for a namespace. Returns -1 if not set
GetMaxConsumersPerTopicWithContext(ctx context.Context, namespace
utils.NameSpaceName) (int, error)
// SetMaxProducersPerTopic sets maxProducersPerTopic for a namespace.
@@ -304,10 +306,10 @@ type Namespaces interface {
//nolint: revive // It's ok here to use a built-in function name (max)
SetMaxProducersPerTopicWithContext(ctx context.Context, namespace
utils.NameSpaceName, max int) error
- // GetMaxProducersPerTopic returns the maxProducersPerTopic for a
namespace.
+ // GetMaxProducersPerTopic returns the maxProducersPerTopic for a
namespace. Returns -1 if not set
GetMaxProducersPerTopic(namespace utils.NameSpaceName) (int, error)
- // GetMaxProducersPerTopicWithContext returns the maxProducersPerTopic
for a namespace.
+ // GetMaxProducersPerTopicWithContext returns the maxProducersPerTopic
for a namespace. Returns -1 if not set
GetMaxProducersPerTopicWithContext(ctx context.Context, namespace
utils.NameSpaceName) (int, error)
// SetMaxTopicsPerNamespace sets maxTopicsPerNamespace for a namespace.
@@ -851,7 +853,7 @@ func (n *namespaces) GetNamespaceMessageTTL(namespace
string) (int, error) {
}
func (n *namespaces) GetNamespaceMessageTTLWithContext(ctx context.Context,
namespace string) (int, error) {
- var ttl int
+ var ttl = -1
nsName, err := utils.GetNamespaceName(namespace)
if err != nil {
return 0, err
@@ -1114,7 +1116,7 @@ func (n *namespaces) GetOffloadDeleteLag(namespace
utils.NameSpaceName) (int64,
}
func (n *namespaces) GetOffloadDeleteLagWithContext(ctx context.Context,
namespace utils.NameSpaceName) (int64, error) {
- var result int64
+ var result int64 = -1
endpoint := n.pulsar.endpoint(n.basePath, namespace.String(),
"offloadDeletionLagMs")
err := n.pulsar.Client.GetWithContext(ctx, endpoint, &result)
return result, err
@@ -1143,7 +1145,7 @@ func (n *namespaces)
GetMaxConsumersPerSubscriptionWithContext(
ctx context.Context,
namespace utils.NameSpaceName,
) (int, error) {
- var result int
+ var result = -1
endpoint := n.pulsar.endpoint(n.basePath, namespace.String(),
"maxConsumersPerSubscription")
err := n.pulsar.Client.GetWithContext(ctx, endpoint, &result)
return result, err
@@ -1167,7 +1169,7 @@ func (n *namespaces) GetOffloadThreshold(namespace
utils.NameSpaceName) (int64,
}
func (n *namespaces) GetOffloadThresholdWithContext(ctx context.Context,
namespace utils.NameSpaceName) (int64, error) {
- var result int64
+ var result int64 = -1
endpoint := n.pulsar.endpoint(n.basePath, namespace.String(),
"offloadThreshold")
err := n.pulsar.Client.GetWithContext(ctx, endpoint, &result)
return result, err
@@ -1194,7 +1196,7 @@ func (n *namespaces)
GetOffloadThresholdInSecondsWithContext(
ctx context.Context,
namespace utils.NameSpaceName,
) (int64, error) {
- var result int64
+ var result int64 = -1
endpoint := n.pulsar.endpoint(n.basePath, namespace.String(),
"offloadThresholdInSeconds")
err := n.pulsar.Client.GetWithContext(ctx, endpoint, &result)
return result, err
@@ -1223,7 +1225,7 @@ func (n *namespaces) GetMaxConsumersPerTopicWithContext(
ctx context.Context,
namespace utils.NameSpaceName,
) (int, error) {
- var result int
+ var result = -1
endpoint := n.pulsar.endpoint(n.basePath, namespace.String(),
"maxConsumersPerTopic")
err := n.pulsar.Client.GetWithContext(ctx, endpoint, &result)
return result, err
@@ -1250,7 +1252,7 @@ func (n *namespaces) GetCompactionThresholdWithContext(
ctx context.Context,
namespace utils.NameSpaceName,
) (int64, error) {
- var result int64
+ var result int64 = -1
endpoint := n.pulsar.endpoint(n.basePath, namespace.String(),
"compactionThreshold")
err := n.pulsar.Client.GetWithContext(ctx, endpoint, &result)
return result, err
@@ -1279,7 +1281,7 @@ func (n *namespaces) GetMaxProducersPerTopicWithContext(
ctx context.Context,
namespace utils.NameSpaceName,
) (int, error) {
- var result int
+ var result = -1
endpoint := n.pulsar.endpoint(n.basePath, namespace.String(),
"maxProducersPerTopic")
err := n.pulsar.Client.GetWithContext(ctx, endpoint, &result)
return result, err
@@ -2014,7 +2016,7 @@ func (n *namespaces) GetMaxTopicsPerNamespaceWithContext(
ctx context.Context,
namespace utils.NameSpaceName,
) (int, error) {
- var result int
+ var result int // This method does not require a sentinel value of -1
since the API never returns empty
endpoint := n.pulsar.endpoint(n.basePath, namespace.String(),
"maxTopicsPerNamespace")
err := n.pulsar.Client.GetWithContext(ctx, endpoint, &result)
return result, err
diff --git a/pulsaradmin/pkg/admin/namespace_test.go
b/pulsaradmin/pkg/admin/namespace_test.go
index 6600cfaf..cfa44951 100644
--- a/pulsaradmin/pkg/admin/namespace_test.go
+++ b/pulsaradmin/pkg/admin/namespace_test.go
@@ -333,7 +333,11 @@ func TestNamespaces_GetOffloadThresholdInSeconds(t
*testing.T) {
namespace, _ := utils.GetNamespaceName("public/default")
- // set the subscription expiration time and get it
+ // Get default (should be -1)
+ threshold, err := admin.Namespaces().GetOffloadThreshold(*namespace)
+ assert.NoError(t, err)
+ assert.Equal(t, int64(-1), threshold)
+
err = admin.Namespaces().SetOffloadThresholdInSeconds(*namespace,
60)
assert.Equal(t, nil, err)
@@ -599,3 +603,163 @@ func TestNamespaces_GetMaxTopicsPerNamespace(t
*testing.T) {
expected = 0
assert.Equal(t, expected, maxTopics)
}
+
+func TestNamespaces_MessageTTL(t *testing.T) {
+ config := &config.Config{}
+ admin, err := New(config)
+ require.NoError(t, err)
+ require.NotNil(t, admin)
+
+ namespace, _ := utils.GetNamespaceName("public/default")
+
+ // Get default (should be -1)
+ ttl, err :=
admin.Namespaces().GetNamespaceMessageTTL(namespace.String())
+ assert.NoError(t, err)
+ assert.Equal(t, -1, ttl)
+
+ // Set to 0 explicitly
+ err = admin.Namespaces().SetNamespaceMessageTTL(namespace.String(), 0)
+ assert.NoError(t, err)
+
+ // Verify returns 0
+ ttl, err = admin.Namespaces().GetNamespaceMessageTTL(namespace.String())
+ assert.NoError(t, err)
+ assert.Equal(t, 0, ttl)
+
+ // Set to positive value
+ err = admin.Namespaces().SetNamespaceMessageTTL(namespace.String(),
3600)
+ assert.NoError(t, err)
+
+ // Verify returns value
+ ttl, err = admin.Namespaces().GetNamespaceMessageTTL(namespace.String())
+ assert.NoError(t, err)
+ assert.Equal(t, 3600, ttl)
+}
+
+func TestNamespaces_OffloadDeleteLag(t *testing.T) {
+ config := &config.Config{}
+ admin, err := New(config)
+ require.NoError(t, err)
+ require.NotNil(t, admin)
+
+ namespace, _ := utils.GetNamespaceName("public/default")
+
+ // Get default (should be -1)
+ lag, err := admin.Namespaces().GetOffloadDeleteLag(*namespace)
+ assert.NoError(t, err)
+ assert.Equal(t, int64(-1), lag)
+
+ // Set to 0 explicitly
+ err = admin.Namespaces().SetOffloadDeleteLag(*namespace, 0)
+ assert.NoError(t, err)
+
+ // Verify returns 0
+ lag, err = admin.Namespaces().GetOffloadDeleteLag(*namespace)
+ assert.NoError(t, err)
+ assert.Equal(t, int64(0), lag)
+
+ // Set to positive value
+ err = admin.Namespaces().SetOffloadDeleteLag(*namespace, 1000)
+ assert.NoError(t, err)
+
+ // Verify returns value
+ lag, err = admin.Namespaces().GetOffloadDeleteLag(*namespace)
+ assert.NoError(t, err)
+ assert.Equal(t, int64(1000), lag)
+}
+
+func TestNamespaces_MaxConsumersPerTopic(t *testing.T) {
+ config := &config.Config{}
+ admin, err := New(config)
+ require.NoError(t, err)
+ require.NotNil(t, admin)
+
+ namespace, _ := utils.GetNamespaceName("public/default")
+
+ // Get default (should be -1)
+ maxConsumers, err :=
admin.Namespaces().GetMaxConsumersPerTopic(*namespace)
+ assert.NoError(t, err)
+ assert.Equal(t, -1, maxConsumers)
+
+ // Set to 0 explicitly
+ err = admin.Namespaces().SetMaxConsumersPerTopic(*namespace, 0)
+ assert.NoError(t, err)
+
+ // Verify returns 0
+ maxConsumers, err =
admin.Namespaces().GetMaxConsumersPerTopic(*namespace)
+ assert.NoError(t, err)
+ assert.Equal(t, 0, maxConsumers)
+
+ // Set to positive value
+ err = admin.Namespaces().SetMaxConsumersPerTopic(*namespace, 100)
+ assert.NoError(t, err)
+
+ // Verify returns value
+ maxConsumers, err =
admin.Namespaces().GetMaxConsumersPerTopic(*namespace)
+ assert.NoError(t, err)
+ assert.Equal(t, 100, maxConsumers)
+}
+
+func TestNamespaces_CompactionThreshold(t *testing.T) {
+ config := &config.Config{}
+ admin, err := New(config)
+ require.NoError(t, err)
+ require.NotNil(t, admin)
+
+ namespace, _ := utils.GetNamespaceName("public/default")
+
+ // Get default (should be -1)
+ threshold, err := admin.Namespaces().GetCompactionThreshold(*namespace)
+ assert.NoError(t, err)
+ assert.Equal(t, int64(-1), threshold)
+
+ // Set to 0 explicitly
+ err = admin.Namespaces().SetCompactionThreshold(*namespace, 0)
+ assert.NoError(t, err)
+
+ // Verify returns 0
+ threshold, err = admin.Namespaces().GetCompactionThreshold(*namespace)
+ assert.NoError(t, err)
+ assert.Equal(t, int64(0), threshold)
+
+ // Set to positive value
+ err = admin.Namespaces().SetCompactionThreshold(*namespace, 1024*1024)
// 1MB
+ assert.NoError(t, err)
+
+ // Verify returns value
+ threshold, err = admin.Namespaces().GetCompactionThreshold(*namespace)
+ assert.NoError(t, err)
+ assert.Equal(t, int64(1024*1024), threshold)
+}
+
+func TestNamespaces_MaxProducersPerTopic(t *testing.T) {
+ config := &config.Config{}
+ admin, err := New(config)
+ require.NoError(t, err)
+ require.NotNil(t, admin)
+
+ namespace, _ := utils.GetNamespaceName("public/default")
+
+ // Get default (should be -1)
+ maxProducers, err :=
admin.Namespaces().GetMaxProducersPerTopic(*namespace)
+ assert.NoError(t, err)
+ assert.Equal(t, -1, maxProducers)
+
+ // Set to 0 explicitly
+ err = admin.Namespaces().SetMaxProducersPerTopic(*namespace, 0)
+ assert.NoError(t, err)
+
+ // Verify returns 0
+ maxProducers, err =
admin.Namespaces().GetMaxProducersPerTopic(*namespace)
+ assert.NoError(t, err)
+ assert.Equal(t, 0, maxProducers)
+
+ // Set to positive value
+ err = admin.Namespaces().SetMaxProducersPerTopic(*namespace, 50)
+ assert.NoError(t, err)
+
+ // Verify returns value
+ maxProducers, err =
admin.Namespaces().GetMaxProducersPerTopic(*namespace)
+ assert.NoError(t, err)
+ assert.Equal(t, 50, maxProducers)
+}
diff --git a/pulsaradmin/pkg/admin/topic.go b/pulsaradmin/pkg/admin/topic.go
index 2e9e155f..37bbe94a 100644
--- a/pulsaradmin/pkg/admin/topic.go
+++ b/pulsaradmin/pkg/admin/topic.go
@@ -390,10 +390,10 @@ type Topics interface {
// CompactStatusWithContext checks the status of an ongoing compaction
for a topic
CompactStatusWithContext(context.Context, utils.TopicName)
(utils.LongRunningProcessStatus, error)
- // GetMessageTTL returns the message TTL for a topic
+ // GetMessageTTL returns the message TTL for a topic. Returns -1 if not
set
GetMessageTTL(utils.TopicName) (int, error)
- // GetMessageTTLWithContext returns the message TTL for a topic
+ // GetMessageTTLWithContext returns the message TTL for a topic.
Returns -1 if not set
GetMessageTTLWithContext(context.Context, utils.TopicName) (int, error)
// SetMessageTTL sets the message TTL for a topic
@@ -420,10 +420,10 @@ type Topics interface {
// RemoveMessageTTLWithContext removes the message TTL for a topic
RemoveMessageTTLWithContext(context.Context, utils.TopicName) error
- // GetMaxProducers Get max number of producers for a topic
+ // GetMaxProducers Get max number of producers for a topic. Returns -1
if not set
GetMaxProducers(utils.TopicName) (int, error)
- // GetMaxProducersWithContext Get max number of producers for a topic
+ // GetMaxProducersWithContext Get max number of producers for a topic.
Returns -1 if not set
GetMaxProducersWithContext(context.Context, utils.TopicName) (int,
error)
// SetMaxProducers sets max number of producers for a topic
@@ -450,10 +450,10 @@ type Topics interface {
// RemoveMaxProducersWithContext removes max number of producers for a
topic
RemoveMaxProducersWithContext(context.Context, utils.TopicName) error
- // GetMaxConsumers returns max number of consumers for a topic
+ // GetMaxConsumers returns max number of consumers for a topic. Returns
-1 if not set
GetMaxConsumers(utils.TopicName) (int, error)
- // GetMaxConsumersWithContext returns max number of consumers for a
topic
+ // GetMaxConsumersWithContext returns max number of consumers for a
topic. Returns -1 if not set
GetMaxConsumersWithContext(context.Context, utils.TopicName) (int,
error)
// SetMaxConsumers sets max number of consumers for a topic
@@ -480,10 +480,11 @@ type Topics interface {
// RemoveMaxConsumersWithContext removes max number of consumers for a
topic
RemoveMaxConsumersWithContext(context.Context, utils.TopicName) error
- // GetMaxUnackMessagesPerConsumer returns max unacked messages policy
on consumer for a topic
+ // GetMaxUnackMessagesPerConsumer returns max unacked messages policy
on consumer for a topic. Returns -1 if not set
GetMaxUnackMessagesPerConsumer(utils.TopicName) (int, error)
- // GetMaxUnackMessagesPerConsumerWithContext returns max unacked
messages policy on consumer for a topic
+ // GetMaxUnackMessagesPerConsumerWithContext returns max unacked
messages policy on consumer for a topic.
+ // Returns -1 if not set
GetMaxUnackMessagesPerConsumerWithContext(context.Context,
utils.TopicName) (int, error)
// SetMaxUnackMessagesPerConsumer sets max unacked messages policy on
consumer for a topic
@@ -510,10 +511,12 @@ type Topics interface {
// RemoveMaxUnackMessagesPerConsumerWithContext removes max unacked
messages policy on consumer for a topic
RemoveMaxUnackMessagesPerConsumerWithContext(context.Context,
utils.TopicName) error
- // GetMaxUnackMessagesPerSubscription returns max unacked messages
policy on subscription for a topic
+ // GetMaxUnackMessagesPerSubscription returns max unacked messages
policy on subscription for a topic.
+ // Returns -1 if not set
GetMaxUnackMessagesPerSubscription(utils.TopicName) (int, error)
- // GetMaxUnackMessagesPerSubscriptionWithContext returns max unacked
messages policy on subscription for a topic
+ // GetMaxUnackMessagesPerSubscriptionWithContext returns max unacked
messages policy on subscription for a topic.
+ // Returns -1 if not set
GetMaxUnackMessagesPerSubscriptionWithContext(context.Context,
utils.TopicName) (int, error)
// SetMaxUnackMessagesPerSubscription sets max unacked messages policy
on subscription for a topic
@@ -674,7 +677,7 @@ type Topics interface {
// SetRetentionWithContext sets the retention policy for a topic
SetRetentionWithContext(context.Context, utils.TopicName,
utils.RetentionPolicies) error
- // GetCompactionThreshold returns the compaction threshold for a topic.
+ // GetCompactionThreshold returns the compaction threshold for a topic.
Returns -1 if not set
//
// i.e. The maximum number of bytes can have before compaction is
triggered.
//
@@ -685,7 +688,7 @@ type Topics interface {
// in namespace or broker level, if no policy set in topic level
GetCompactionThreshold(topic utils.TopicName, applied bool) (int64,
error)
- // GetCompactionThresholdWithContext returns the compaction threshold
for a topic.
+ // GetCompactionThresholdWithContext returns the compaction threshold
for a topic. Returns -1 if not set
//
// i.e. The maximum number of bytes can have before compaction is
triggered.
//
@@ -854,10 +857,10 @@ type Topics interface {
// RemoveSubscriptionDispatchRateWithContext removes subscription
dispatch rate for a topic
RemoveSubscriptionDispatchRateWithContext(context.Context,
utils.TopicName) error
- // GetMaxConsumersPerSubscription returns max consumers per
subscription for a topic
+ // GetMaxConsumersPerSubscription returns max consumers per
subscription for a topic. Returns -1 if not set
GetMaxConsumersPerSubscription(utils.TopicName) (int, error)
- // GetMaxConsumersPerSubscriptionWithContext returns max consumers per
subscription for a topic
+ // GetMaxConsumersPerSubscriptionWithContext returns max consumers per
subscription for a topic. Returns -1 if not set
GetMaxConsumersPerSubscriptionWithContext(context.Context,
utils.TopicName) (int, error)
// SetMaxConsumersPerSubscription sets max consumers per subscription
for a topic
@@ -872,10 +875,10 @@ type Topics interface {
// RemoveMaxConsumersPerSubscriptionWithContext removes max consumers
per subscription for a topic
RemoveMaxConsumersPerSubscriptionWithContext(context.Context,
utils.TopicName) error
- // GetMaxMessageSize returns max message size for a topic
+ // GetMaxMessageSize returns max message size for a topic. Returns -1
if not set
GetMaxMessageSize(utils.TopicName) (int, error)
- // GetMaxMessageSizeWithContext returns max message size for a topic
+ // GetMaxMessageSizeWithContext returns max message size for a topic.
Returns -1 if not set
GetMaxMessageSizeWithContext(context.Context, utils.TopicName) (int,
error)
// SetMaxMessageSize sets max message size for a topic
@@ -890,10 +893,10 @@ type Topics interface {
// RemoveMaxMessageSizeWithContext removes max message size for a topic
RemoveMaxMessageSizeWithContext(context.Context, utils.TopicName) error
- // GetMaxSubscriptionsPerTopic returns max subscriptions per topic
+ // GetMaxSubscriptionsPerTopic returns max subscriptions per topic.
Returns -1 if not set
GetMaxSubscriptionsPerTopic(utils.TopicName) (int, error)
- // GetMaxSubscriptionsPerTopicWithContext returns max subscriptions per
topic
+ // GetMaxSubscriptionsPerTopicWithContext returns max subscriptions per
topic. Returns -1 if not set
GetMaxSubscriptionsPerTopicWithContext(context.Context,
utils.TopicName) (int, error)
// SetMaxSubscriptionsPerTopic sets max subscriptions per topic
@@ -926,10 +929,11 @@ type Topics interface {
// RemoveSchemaValidationEnforcedWithContext removes schema validation
enforced flag for a topic
RemoveSchemaValidationEnforcedWithContext(context.Context,
utils.TopicName) error
- // GetDeduplicationSnapshotInterval returns deduplication snapshot
interval for a topic
+ // GetDeduplicationSnapshotInterval returns deduplication snapshot
interval for a topic. Returns -1 if not set
GetDeduplicationSnapshotInterval(utils.TopicName) (int, error)
- // GetDeduplicationSnapshotIntervalWithContext returns deduplication
snapshot interval for a topic
+ // GetDeduplicationSnapshotIntervalWithContext returns deduplication
snapshot interval for a topic.
+ // Returns -1 if not set
GetDeduplicationSnapshotIntervalWithContext(context.Context,
utils.TopicName) (int, error)
// SetDeduplicationSnapshotInterval sets deduplication snapshot
interval for a topic
@@ -1457,7 +1461,7 @@ func (t *topics) GetMessageTTL(topic utils.TopicName)
(int, error) {
}
func (t *topics) GetMessageTTLWithContext(ctx context.Context, topic
utils.TopicName) (int, error) {
- var ttl int
+ var ttl = -1
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(),
"messageTTL")
err := t.pulsar.Client.GetWithContext(ctx, endpoint, &ttl)
return ttl, err
@@ -1492,7 +1496,7 @@ func (t *topics) GetMaxProducers(topic utils.TopicName)
(int, error) {
}
func (t *topics) GetMaxProducersWithContext(ctx context.Context, topic
utils.TopicName) (int, error) {
- var maxProducers int
+ var maxProducers = -1
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(),
"maxProducers")
err := t.pulsar.Client.GetWithContext(ctx, endpoint, &maxProducers)
return maxProducers, err
@@ -1523,7 +1527,7 @@ func (t *topics) GetMaxConsumers(topic utils.TopicName)
(int, error) {
}
func (t *topics) GetMaxConsumersWithContext(ctx context.Context, topic
utils.TopicName) (int, error) {
- var maxConsumers int
+ var maxConsumers = -1
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(),
"maxConsumers")
err := t.pulsar.Client.GetWithContext(ctx, endpoint, &maxConsumers)
return maxConsumers, err
@@ -1554,7 +1558,7 @@ func (t *topics) GetMaxUnackMessagesPerConsumer(topic
utils.TopicName) (int, err
}
func (t *topics) GetMaxUnackMessagesPerConsumerWithContext(ctx
context.Context, topic utils.TopicName) (int, error) {
- var maxNum int
+ var maxNum = -1
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(),
"maxUnackedMessagesOnConsumer")
err := t.pulsar.Client.GetWithContext(ctx, endpoint, &maxNum)
return maxNum, err
@@ -1590,7 +1594,7 @@ func (t *topics)
GetMaxUnackMessagesPerSubscriptionWithContext(
ctx context.Context,
topic utils.TopicName,
) (int, error) {
- var maxNum int
+ var maxNum = -1
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(),
"maxUnackedMessagesOnSubscription")
err := t.pulsar.Client.GetWithContext(ctx, endpoint, &maxNum)
return maxNum, err
@@ -1833,7 +1837,7 @@ func (t *topics) GetCompactionThresholdWithContext(
topic utils.TopicName,
applied bool,
) (int64, error) {
- var threshold int64
+ var threshold int64 = -1
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(),
"compactionThreshold")
_, err := t.pulsar.Client.GetWithQueryParamsWithContext(ctx, endpoint,
&threshold, map[string]string{
"applied": strconv.FormatBool(applied),
@@ -2042,7 +2046,7 @@ func (t *topics) GetMaxConsumersPerSubscription(topic
utils.TopicName) (int, err
}
func (t *topics) GetMaxConsumersPerSubscriptionWithContext(ctx
context.Context, topic utils.TopicName) (int, error) {
- var maxConsumers int
+ var maxConsumers = -1
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(),
"maxConsumersPerSubscription")
err := t.pulsar.Client.GetWithContext(ctx, endpoint, &maxConsumers)
return maxConsumers, err
@@ -2075,7 +2079,7 @@ func (t *topics) GetMaxMessageSize(topic utils.TopicName)
(int, error) {
}
func (t *topics) GetMaxMessageSizeWithContext(ctx context.Context, topic
utils.TopicName) (int, error) {
- var maxMessageSize int
+ var maxMessageSize = -1
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(),
"maxMessageSize")
err := t.pulsar.Client.GetWithContext(ctx, endpoint, &maxMessageSize)
return maxMessageSize, err
@@ -2104,7 +2108,7 @@ func (t *topics) GetMaxSubscriptionsPerTopic(topic
utils.TopicName) (int, error)
}
func (t *topics) GetMaxSubscriptionsPerTopicWithContext(ctx context.Context,
topic utils.TopicName) (int, error) {
- var maxSubscriptions int
+ var maxSubscriptions = -1
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(),
"maxSubscriptionsPerTopic")
err := t.pulsar.Client.GetWithContext(ctx, endpoint, &maxSubscriptions)
return maxSubscriptions, err
@@ -2170,7 +2174,7 @@ func (t *topics) GetDeduplicationSnapshotInterval(topic
utils.TopicName) (int, e
}
func (t *topics) GetDeduplicationSnapshotIntervalWithContext(ctx
context.Context, topic utils.TopicName) (int, error) {
- var interval int
+ var interval = -1
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(),
"deduplicationSnapshotInterval")
err := t.pulsar.Client.GetWithContext(ctx, endpoint, &interval)
return interval, err
diff --git a/pulsaradmin/pkg/admin/topic_test.go
b/pulsaradmin/pkg/admin/topic_test.go
index 9abf8027..8493cf76 100644
--- a/pulsaradmin/pkg/admin/topic_test.go
+++ b/pulsaradmin/pkg/admin/topic_test.go
@@ -477,7 +477,7 @@ func TestMessageTTL(t *testing.T) {
messageTTL, err := admin.Topics().GetMessageTTL(*topicName)
assert.NoError(t, err)
- assert.Equal(t, 0, messageTTL)
+ assert.Equal(t, -1, messageTTL)
err = admin.Topics().SetMessageTTL(*topicName, 600)
assert.NoError(t, err)
// topic policy is an async operation,
@@ -497,7 +497,7 @@ func TestMessageTTL(t *testing.T) {
t,
func() bool {
messageTTL, err =
admin.Topics().GetMessageTTL(*topicName)
- return err == nil && messageTTL == 0
+ return err == nil && messageTTL == -1
},
10*time.Second,
100*time.Millisecond,
@@ -690,7 +690,7 @@ func TestMaxConsumersPerSubscription(t *testing.T) {
// Get default max consumers per subscription
maxConsumers, err :=
admin.Topics().GetMaxConsumersPerSubscription(*topicName)
assert.NoError(t, err)
- assert.Equal(t, 0, maxConsumers)
+ assert.Equal(t, -1, maxConsumers)
// Set new max consumers per subscription
err = admin.Topics().SetMaxConsumersPerSubscription(*topicName, 10)
@@ -715,7 +715,7 @@ func TestMaxConsumersPerSubscription(t *testing.T) {
t,
func() bool {
maxConsumers, err =
admin.Topics().GetMaxConsumersPerSubscription(*topicName)
- return err == nil && maxConsumers == 0
+ return err == nil && maxConsumers == -1
},
10*time.Second,
100*time.Millisecond,
@@ -737,7 +737,7 @@ func TestMaxMessageSize(t *testing.T) {
// Get default max message size
maxMessageSize, err := admin.Topics().GetMaxMessageSize(*topicName)
assert.NoError(t, err)
- assert.Equal(t, 0, maxMessageSize)
+ assert.Equal(t, -1, maxMessageSize)
// Set new max message size (1MB)
err = admin.Topics().SetMaxMessageSize(*topicName, 1048576)
@@ -762,7 +762,7 @@ func TestMaxMessageSize(t *testing.T) {
t,
func() bool {
maxMessageSize, err =
admin.Topics().GetMaxMessageSize(*topicName)
- return err == nil && maxMessageSize == 0
+ return err == nil && maxMessageSize == -1
},
10*time.Second,
100*time.Millisecond,
@@ -784,7 +784,7 @@ func TestMaxSubscriptionsPerTopic(t *testing.T) {
// Get default max subscriptions per topic
maxSubscriptions, err :=
admin.Topics().GetMaxSubscriptionsPerTopic(*topicName)
assert.NoError(t, err)
- assert.Equal(t, 0, maxSubscriptions)
+ assert.Equal(t, -1, maxSubscriptions)
// Set new max subscriptions per topic
err = admin.Topics().SetMaxSubscriptionsPerTopic(*topicName, 100)
@@ -809,7 +809,7 @@ func TestMaxSubscriptionsPerTopic(t *testing.T) {
t,
func() bool {
maxSubscriptions, err =
admin.Topics().GetMaxSubscriptionsPerTopic(*topicName)
- return err == nil && maxSubscriptions == 0
+ return err == nil && maxSubscriptions == -1
},
10*time.Second,
100*time.Millisecond,
@@ -899,7 +899,7 @@ func TestDeduplicationSnapshotInterval(t *testing.T) {
// Get default deduplication snapshot interval
interval, err :=
admin.Topics().GetDeduplicationSnapshotInterval(*topicName)
assert.NoError(t, err)
- assert.Equal(t, 0, interval)
+ assert.Equal(t, -1, interval)
// Set new deduplication snapshot interval
err = admin.Topics().SetDeduplicationSnapshotInterval(*topicName, 1000)
@@ -924,7 +924,7 @@ func TestDeduplicationSnapshotInterval(t *testing.T) {
t,
func() bool {
interval, err =
admin.Topics().GetDeduplicationSnapshotInterval(*topicName)
- return err == nil && interval == 0
+ return err == nil && interval == -1
},
10*time.Second,
100*time.Millisecond,
@@ -1161,3 +1161,211 @@ func TestSchemaCompatibilityStrategy(t *testing.T) {
100*time.Millisecond,
)
}
+
+func TestTopics_MaxProducers(t *testing.T) {
+ randomName := newTopicName()
+ topic := "persistent://public/default/" + randomName
+ cfg := &config.Config{}
+ admin, err := New(cfg)
+ assert.NoError(t, err)
+ assert.NotNil(t, admin)
+ topicName, err := utils.GetTopicName(topic)
+ assert.NoError(t, err)
+ err = admin.Topics().Create(*topicName, 4)
+ assert.NoError(t, err)
+
+ // Get default (should be -1)
+ maxProducers, err := admin.Topics().GetMaxProducers(*topicName)
+ assert.NoError(t, err)
+ assert.Equal(t, -1, maxProducers)
+
+ // Set to 0 explicitly (unlimited)
+ err = admin.Topics().SetMaxProducers(*topicName, 0)
+ assert.NoError(t, err)
+
+ // Verify returns 0
+ assert.Eventually(
+ t,
+ func() bool {
+ maxProducers, err =
admin.Topics().GetMaxProducers(*topicName)
+ return err == nil && maxProducers == 0
+ },
+ 10*time.Second,
+ 100*time.Millisecond,
+ )
+
+ // Set to positive value
+ err = admin.Topics().SetMaxProducers(*topicName, 10)
+ assert.NoError(t, err)
+
+ // Verify returns value
+ assert.Eventually(
+ t,
+ func() bool {
+ maxProducers, err =
admin.Topics().GetMaxProducers(*topicName)
+ return err == nil && maxProducers == 10
+ },
+ 10*time.Second,
+ 100*time.Millisecond,
+ )
+
+ // Remove
+ err = admin.Topics().RemoveMaxProducers(*topicName)
+ assert.NoError(t, err)
+}
+
+func TestTopics_MaxConsumers(t *testing.T) {
+ randomName := newTopicName()
+ topic := "persistent://public/default/" + randomName
+ cfg := &config.Config{}
+ admin, err := New(cfg)
+ assert.NoError(t, err)
+ assert.NotNil(t, admin)
+ topicName, err := utils.GetTopicName(topic)
+ assert.NoError(t, err)
+ err = admin.Topics().Create(*topicName, 4)
+ assert.NoError(t, err)
+
+ // Get default (should be -1)
+ maxConsumers, err := admin.Topics().GetMaxConsumers(*topicName)
+ assert.NoError(t, err)
+ assert.Equal(t, -1, maxConsumers)
+
+ // Set to 0 explicitly
+ err = admin.Topics().SetMaxConsumers(*topicName, 0)
+ assert.NoError(t, err)
+
+ // Verify returns 0
+ assert.Eventually(
+ t,
+ func() bool {
+ maxConsumers, err =
admin.Topics().GetMaxConsumers(*topicName)
+ return err == nil && maxConsumers == 0
+ },
+ 10*time.Second,
+ 100*time.Millisecond,
+ )
+
+ // Set to positive value
+ err = admin.Topics().SetMaxConsumers(*topicName, 20)
+ assert.NoError(t, err)
+
+ // Verify returns value
+ assert.Eventually(
+ t,
+ func() bool {
+ maxConsumers, err =
admin.Topics().GetMaxConsumers(*topicName)
+ return err == nil && maxConsumers == 20
+ },
+ 10*time.Second,
+ 100*time.Millisecond,
+ )
+
+ // Remove
+ err = admin.Topics().RemoveMaxConsumers(*topicName)
+ assert.NoError(t, err)
+}
+
+func TestTopics_MaxUnackMessagesPerConsumer(t *testing.T) {
+ randomName := newTopicName()
+ topic := "persistent://public/default/" + randomName
+ cfg := &config.Config{}
+ admin, err := New(cfg)
+ assert.NoError(t, err)
+ assert.NotNil(t, admin)
+ topicName, err := utils.GetTopicName(topic)
+ assert.NoError(t, err)
+ err = admin.Topics().Create(*topicName, 4)
+ assert.NoError(t, err)
+
+ // Get default (should be -1)
+ maxUnack, err :=
admin.Topics().GetMaxUnackMessagesPerConsumer(*topicName)
+ assert.NoError(t, err)
+ assert.Equal(t, -1, maxUnack)
+
+ // Set to 0 explicitly
+ err = admin.Topics().SetMaxUnackMessagesPerConsumer(*topicName, 0)
+ assert.NoError(t, err)
+
+ // Verify returns 0
+ assert.Eventually(
+ t,
+ func() bool {
+ maxUnack, err =
admin.Topics().GetMaxUnackMessagesPerConsumer(*topicName)
+ return err == nil && maxUnack == 0
+ },
+ 10*time.Second,
+ 100*time.Millisecond,
+ )
+
+ // Set to positive value
+ err = admin.Topics().SetMaxUnackMessagesPerConsumer(*topicName, 1000)
+ assert.NoError(t, err)
+
+ // Verify returns value
+ assert.Eventually(
+ t,
+ func() bool {
+ maxUnack, err =
admin.Topics().GetMaxUnackMessagesPerConsumer(*topicName)
+ return err == nil && maxUnack == 1000
+ },
+ 10*time.Second,
+ 100*time.Millisecond,
+ )
+
+ // Remove
+ err = admin.Topics().RemoveMaxUnackMessagesPerConsumer(*topicName)
+ assert.NoError(t, err)
+}
+
+func TestTopics_MaxUnackMessagesPerSubscription(t *testing.T) {
+ randomName := newTopicName()
+ topic := "persistent://public/default/" + randomName
+ cfg := &config.Config{}
+ admin, err := New(cfg)
+ assert.NoError(t, err)
+ assert.NotNil(t, admin)
+ topicName, err := utils.GetTopicName(topic)
+ assert.NoError(t, err)
+ err = admin.Topics().Create(*topicName, 4)
+ assert.NoError(t, err)
+
+ // Get default (should be -1)
+ maxUnack, err :=
admin.Topics().GetMaxUnackMessagesPerSubscription(*topicName)
+ assert.NoError(t, err)
+ assert.Equal(t, -1, maxUnack)
+
+ // Set to 0 explicitly
+ err = admin.Topics().SetMaxUnackMessagesPerSubscription(*topicName, 0)
+ assert.NoError(t, err)
+
+ // Verify returns 0
+ assert.Eventually(
+ t,
+ func() bool {
+ maxUnack, err =
admin.Topics().GetMaxUnackMessagesPerSubscription(*topicName)
+ return err == nil && maxUnack == 0
+ },
+ 10*time.Second,
+ 100*time.Millisecond,
+ )
+
+ // Set to positive value
+ err = admin.Topics().SetMaxUnackMessagesPerSubscription(*topicName,
5000)
+ assert.NoError(t, err)
+
+ // Verify returns value
+ assert.Eventually(
+ t,
+ func() bool {
+ maxUnack, err =
admin.Topics().GetMaxUnackMessagesPerSubscription(*topicName)
+ return err == nil && maxUnack == 5000
+ },
+ 10*time.Second,
+ 100*time.Millisecond,
+ )
+
+ // Remove
+ err = admin.Topics().RemoveMaxUnackMessagesPerSubscription(*topicName)
+ assert.NoError(t, err)
+}