This is an automated email from the ASF dual-hosted git repository.
crossoverjie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 50efe422 Update namespace & topic admin methods to return nil if unset
(#1433)
50efe422 is described below
commit 50efe422e74f98ddf93b2dbf124cb316ec2d92e2
Author: Kai <[email protected]>
AuthorDate: Thu Oct 30 01:06:18 2025 -0700
Update namespace & topic admin methods to return nil if unset (#1433)
Co-authored-by: Copilot <[email protected]>
Co-authored-by: crossoverJie <[email protected]>
---
pulsaradmin/pkg/admin/namespace.go | 52 ++++--
pulsaradmin/pkg/admin/namespace_test.go | 111 ++++++++++++-
pulsaradmin/pkg/admin/topic.go | 130 ++++++++++-----
pulsaradmin/pkg/admin/topic_test.go | 285 ++++++++++++++++++++++++++------
pulsaradmin/pkg/rest/client.go | 27 ++-
5 files changed, 492 insertions(+), 113 deletions(-)
diff --git a/pulsaradmin/pkg/admin/namespace.go
b/pulsaradmin/pkg/admin/namespace.go
index 6114f5ce..d5d33f12 100644
--- a/pulsaradmin/pkg/admin/namespace.go
+++ b/pulsaradmin/pkg/admin/namespace.go
@@ -94,10 +94,12 @@ type Namespaces interface {
// GetNamespaceMessageTTLWithContext returns the message TTL for a
namespace. Returns -1 if not set
GetNamespaceMessageTTLWithContext(ctx context.Context, namespace
string) (int, error)
- // GetRetention returns the retention configuration for a namespace
+ // GetRetention returns the retention configuration for a namespace.
+ // Returns nil if the retention policy is not configured at the
namespace level.
GetRetention(namespace string) (*utils.RetentionPolicies, error)
- // GetRetentionWithContext returns the retention configuration for a
namespace
+ // GetRetentionWithContext returns the retention configuration for a
namespace.
+ // Returns nil if the retention policy is not configured at the
namespace level.
GetRetentionWithContext(ctx context.Context, namespace string)
(*utils.RetentionPolicies, error)
// SetRetention sets the retention configuration for all the topics on
a namespace
@@ -132,10 +134,12 @@ type Namespaces interface {
// RemoveBacklogQuotaWithContext removes a backlog quota policy from a
namespace
RemoveBacklogQuotaWithContext(ctx context.Context, namespace string)
error
- // GetTopicAutoCreation returns the topic auto-creation config for a
namespace
+ // GetTopicAutoCreation returns the topic auto-creation config for a
namespace.
+ // Returns nil if the topic auto-creation config is not configured at
the namespace level.
GetTopicAutoCreation(namespace utils.NameSpaceName)
(*utils.TopicAutoCreationConfig, error)
- // GetTopicAutoCreationWithContext returns the topic auto-creation
config for a namespace
+ // GetTopicAutoCreationWithContext returns the topic auto-creation
config for a namespace.
+ // Returns nil if the topic auto-creation config is not configured at
the namespace level.
GetTopicAutoCreationWithContext(
ctx context.Context,
namespace utils.NameSpaceName,
@@ -392,10 +396,12 @@ type Namespaces interface {
// SetPersistenceWithContext sets the persistence configuration for all
the topics on a namespace
SetPersistenceWithContext(ctx context.Context, namespace string,
persistence utils.PersistencePolicies) error
- // GetPersistence returns the persistence configuration for a namespace
+ // GetPersistence returns the persistence configuration for a namespace.
+ // Returns nil if the persistence policy is not configured at the
namespace level.
GetPersistence(namespace string) (*utils.PersistencePolicies, error)
- // GetPersistenceWithContext returns the persistence configuration for
a namespace
+ // GetPersistenceWithContext returns the persistence configuration for
a namespace.
+ // Returns nil if the persistence policy is not configured at the
namespace level.
GetPersistenceWithContext(ctx context.Context, namespace string)
(*utils.PersistencePolicies, error)
// SetBookieAffinityGroup sets bookie affinity group for a namespace to
isolate namespace write to bookies that are
@@ -416,10 +422,12 @@ type Namespaces interface {
// DeleteBookieAffinityGroupWithContext deletes bookie affinity group
configured for a namespace
DeleteBookieAffinityGroupWithContext(ctx context.Context, namespace
string) error
- // GetBookieAffinityGroup returns bookie affinity group configured for
a namespace
+ // GetBookieAffinityGroup returns bookie affinity group configured for
a namespace.
+ // Returns nil if the bookie affinity group is not configured at the
namespace level.
GetBookieAffinityGroup(namespace string)
(*utils.BookieAffinityGroupData, error)
- // GetBookieAffinityGroupWithContext returns bookie affinity group
configured for a namespace
+ // GetBookieAffinityGroupWithContext returns bookie affinity group
configured for a namespace.
+ // Returns nil if the bookie affinity group is not configured at the
namespace level.
GetBookieAffinityGroupWithContext(ctx context.Context, namespace
string) (*utils.BookieAffinityGroupData, error)
// Unload a namespace from the current serving broker
@@ -905,8 +913,11 @@ func (n *namespaces) GetRetentionWithContext(ctx
context.Context, namespace stri
return nil, err
}
endpoint := n.pulsar.endpoint(n.basePath, nsName.String(), "retention")
- err = n.pulsar.Client.GetWithContext(ctx, endpoint, &policy)
- return &policy, err
+ body, err := n.pulsar.Client.GetBodyWithContext(ctx, endpoint, &policy)
+ if body != nil {
+ return &policy, err
+ }
+ return nil, err
}
func (n *namespaces) GetBacklogQuotaMap(namespace string)
(map[utils.BacklogQuotaType]utils.BacklogQuota, error) {
@@ -970,8 +981,11 @@ func (n *namespaces) GetTopicAutoCreationWithContext(
) (*utils.TopicAutoCreationConfig, error) {
var topicAutoCreation utils.TopicAutoCreationConfig
endpoint := n.pulsar.endpoint(n.basePath, namespace.String(),
"autoTopicCreation")
- err := n.pulsar.Client.GetWithContext(ctx, endpoint, &topicAutoCreation)
- return &topicAutoCreation, err
+ body, err := n.pulsar.Client.GetBodyWithContext(ctx, endpoint,
&topicAutoCreation)
+ if body != nil {
+ return &topicAutoCreation, err
+ }
+ return nil, err
}
func (n *namespaces) SetTopicAutoCreation(namespace utils.NameSpaceName,
config utils.TopicAutoCreationConfig) error {
@@ -1463,8 +1477,11 @@ func (n *namespaces) GetBookieAffinityGroupWithContext(
return nil, err
}
endpoint := n.pulsar.endpoint(n.basePath, nsName.String(),
"persistence", "bookieAffinity")
- err = n.pulsar.Client.GetWithContext(ctx, endpoint, &data)
- return &data, err
+ body, err := n.pulsar.Client.GetBodyWithContext(ctx, endpoint, &data)
+ if body != nil {
+ return &data, err
+ }
+ return nil, err
}
func (n *namespaces) GetPersistence(namespace string)
(*utils.PersistencePolicies, error) {
@@ -1481,8 +1498,11 @@ func (n *namespaces) GetPersistenceWithContext(
return nil, err
}
endpoint := n.pulsar.endpoint(n.basePath, nsName.String(),
"persistence")
- err = n.pulsar.Client.GetWithContext(ctx, endpoint, &persistence)
- return &persistence, err
+ body, err := n.pulsar.Client.GetBodyWithContext(ctx, endpoint,
&persistence)
+ if body != nil {
+ return &persistence, err
+ }
+ return nil, err
}
func (n *namespaces) Unload(namespace string) error {
diff --git a/pulsaradmin/pkg/admin/namespace_test.go
b/pulsaradmin/pkg/admin/namespace_test.go
index cfa44951..1ca235f5 100644
--- a/pulsaradmin/pkg/admin/namespace_test.go
+++ b/pulsaradmin/pkg/admin/namespace_test.go
@@ -18,6 +18,7 @@
package admin
import (
+ "os"
"testing"
"time"
@@ -156,23 +157,20 @@ func TestGetTopicAutoCreation(t *testing.T) {
assert.Equal(t, nil, err)
topicAutoCreation, err :=
admin.Namespaces().GetTopicAutoCreation(*namespace)
assert.Equal(t, nil, err)
+ assert.NotNil(t, topicAutoCreation, "Expected non-nil when topic auto
creation is configured")
expected := utils.TopicAutoCreationConfig{
Allow: true,
Type: utils.NonPartitioned,
}
assert.Equal(t, expected, *topicAutoCreation)
- // remove the topic auto creation config and get it
+ // remove the topic auto creation config and get it - should return nil
err = admin.Namespaces().RemoveTopicAutoCreation(*namespace)
assert.Equal(t, nil, err)
topicAutoCreation, err =
admin.Namespaces().GetTopicAutoCreation(*namespace)
assert.Equal(t, nil, err)
- expected = utils.TopicAutoCreationConfig{
- Allow: false,
- Type: "",
- }
- assert.Equal(t, expected, *topicAutoCreation)
+ assert.Nil(t, topicAutoCreation, "Expected nil when topic auto creation
is not configured")
}
func TestRevokeSubPermission(t *testing.T) {
@@ -763,3 +761,104 @@ func TestNamespaces_MaxProducersPerTopic(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, 50, maxProducers)
}
+
+func TestNamespaces_Retention(t *testing.T) {
+ config := &config.Config{}
+ admin, err := New(config)
+ require.NoError(t, err)
+ require.NotNil(t, admin)
+
+ namespaceName := "public/default"
+
+ // Initial state: policy not configured, should return nil
+ retention, err := admin.Namespaces().GetRetention(namespaceName)
+ assert.NoError(t, err)
+ assert.Nil(t, retention, "Expected nil when retention is not
configured")
+
+ // Set new retention policy
+ newRetention := utils.RetentionPolicies{
+ RetentionSizeInMB: 1024,
+ RetentionTimeInMinutes: 60,
+ }
+ err = admin.Namespaces().SetRetention(namespaceName, newRetention)
+ assert.NoError(t, err)
+
+ // Verify retention is set
+ retention, err = admin.Namespaces().GetRetention(namespaceName)
+ assert.NoError(t, err)
+ assert.NotNil(t, retention, "Expected non-nil when retention is
configured")
+ assert.Equal(t, int64(1024), retention.RetentionSizeInMB)
+ assert.Equal(t, 60, retention.RetentionTimeInMinutes)
+}
+
+func TestNamespaces_BookieAffinityGroup(t *testing.T) {
+ readFile, err :=
os.ReadFile("../../../integration-tests/tokens/admin-token")
+ require.NoError(t, err)
+
+ config := &config.Config{
+ Token: string(readFile),
+ }
+ admin, err := New(config)
+ require.NoError(t, err)
+ require.NotNil(t, admin)
+
+ namespaceName := "public/default"
+
+ // Initial state: policy not configured, should return nil
+ bookieAffinity, err :=
admin.Namespaces().GetBookieAffinityGroup(namespaceName)
+ assert.NoError(t, err)
+ assert.Nil(t, bookieAffinity, "Expected nil when bookie affinity group
is not configured")
+
+ // Set new bookie affinity group
+ newBookieAffinity := utils.BookieAffinityGroupData{
+ BookkeeperAffinityGroupPrimary: "primary-group",
+ BookkeeperAffinityGroupSecondary: "secondary-group",
+ }
+ err = admin.Namespaces().SetBookieAffinityGroup(namespaceName,
newBookieAffinity)
+ assert.NoError(t, err)
+
+ // Verify bookie affinity group is set
+ bookieAffinity, err =
admin.Namespaces().GetBookieAffinityGroup(namespaceName)
+ assert.NoError(t, err)
+ assert.NotNil(t, bookieAffinity, "Expected non-nil when bookie affinity
group is configured")
+ assert.Equal(t, "primary-group",
bookieAffinity.BookkeeperAffinityGroupPrimary)
+ assert.Equal(t, "secondary-group",
bookieAffinity.BookkeeperAffinityGroupSecondary)
+
+ // Remove bookie affinity group - should return nil
+ err = admin.Namespaces().DeleteBookieAffinityGroup(namespaceName)
+ assert.NoError(t, err)
+ bookieAffinity, err =
admin.Namespaces().GetBookieAffinityGroup(namespaceName)
+ assert.NoError(t, err)
+ assert.Nil(t, bookieAffinity, "Expected nil after removing bookie
affinity group")
+}
+
+func TestNamespaces_Persistence(t *testing.T) {
+ config := &config.Config{}
+ admin, err := New(config)
+ require.NoError(t, err)
+ require.NotNil(t, admin)
+
+ namespaceName := "public/default"
+
+ // Initial state: policy not configured, should return nil
+ persistence, err := admin.Namespaces().GetPersistence(namespaceName)
+ assert.NoError(t, err)
+ assert.Nil(t, persistence, "Expected nil when persistence is not
configured")
+
+ // Set new persistence policy
+ newPersistence := utils.PersistencePolicies{
+ BookkeeperEnsemble: 1,
+ BookkeeperWriteQuorum: 1,
+ BookkeeperAckQuorum: 1,
+ ManagedLedgerMaxMarkDeleteRate: 10.0,
+ }
+ err = admin.Namespaces().SetPersistence(namespaceName, newPersistence)
+ assert.NoError(t, err)
+
+ // Verify persistence is set
+ persistence, err = admin.Namespaces().GetPersistence(namespaceName)
+ assert.NoError(t, err)
+ assert.NotNil(t, persistence, "Expected non-nil when persistence is
configured")
+ assert.Equal(t, 1, persistence.BookkeeperEnsemble)
+ assert.Equal(t, 1, persistence.BookkeeperWriteQuorum)
+}
diff --git a/pulsaradmin/pkg/admin/topic.go b/pulsaradmin/pkg/admin/topic.go
index 37bbe94a..de195138 100644
--- a/pulsaradmin/pkg/admin/topic.go
+++ b/pulsaradmin/pkg/admin/topic.go
@@ -543,10 +543,12 @@ type Topics interface {
// RemoveMaxUnackMessagesPerSubscriptionWithContext removes max unacked
messages policy on subscription for a topic
RemoveMaxUnackMessagesPerSubscriptionWithContext(context.Context,
utils.TopicName) error
- // GetPersistence returns the persistence policies for a topic
+ // GetPersistence returns the persistence policies for a topic.
+ // Returns nil if the persistence policy is not configured at the topic
level.
GetPersistence(utils.TopicName) (*utils.PersistenceData, error)
- // GetPersistenceWithContext returns the persistence policies for a
topic
+ // GetPersistenceWithContext returns the persistence policies for a
topic.
+ // Returns nil if the persistence policy is not configured at the topic
level.
GetPersistenceWithContext(context.Context, utils.TopicName)
(*utils.PersistenceData, error)
// SetPersistence sets the persistence policies for a topic
@@ -561,10 +563,12 @@ type Topics interface {
// RemovePersistenceWithContext removes the persistence policies for a
topic
RemovePersistenceWithContext(context.Context, utils.TopicName) error
- // GetDelayedDelivery returns the delayed delivery policy for a topic
+ // GetDelayedDelivery returns the delayed delivery policy for a topic.
+ // Returns nil if the delayed delivery policy is not configured at the
topic level.
GetDelayedDelivery(utils.TopicName) (*utils.DelayedDeliveryData, error)
- // GetDelayedDeliveryWithContext returns the delayed delivery policy
for a topic
+ // GetDelayedDeliveryWithContext returns the delayed delivery policy
for a topic.
+ // Returns nil if the delayed delivery policy is not configured at the
topic level.
GetDelayedDeliveryWithContext(context.Context, utils.TopicName)
(*utils.DelayedDeliveryData, error)
// SetDelayedDelivery sets the delayed delivery policy on a topic
@@ -579,10 +583,12 @@ type Topics interface {
// RemoveDelayedDeliveryWithContext removes the delayed delivery policy
on a topic
RemoveDelayedDeliveryWithContext(context.Context, utils.TopicName) error
- // GetDispatchRate returns message dispatch rate for a topic
+ // GetDispatchRate returns message dispatch rate for a topic.
+ // Returns nil if the dispatch rate is not configured at the topic
level.
GetDispatchRate(utils.TopicName) (*utils.DispatchRateData, error)
- // GetDispatchRateWithContext returns message dispatch rate for a topic
+ // GetDispatchRateWithContext returns message dispatch rate for a topic.
+ // Returns nil if the dispatch rate is not configured at the topic
level.
GetDispatchRateWithContext(context.Context, utils.TopicName)
(*utils.DispatchRateData, error)
// SetDispatchRate sets message dispatch rate for a topic
@@ -597,10 +603,12 @@ type Topics interface {
// RemoveDispatchRateWithContext removes message dispatch rate for a
topic
RemoveDispatchRateWithContext(context.Context, utils.TopicName) error
- // GetPublishRate returns message publish rate for a topic
+ // GetPublishRate returns message publish rate for a topic.
+ // Returns nil if the publish rate is not configured at the topic level.
GetPublishRate(utils.TopicName) (*utils.PublishRateData, error)
- // GetPublishRateWithContext returns message publish rate for a topic
+ // GetPublishRateWithContext returns message publish rate for a topic.
+ // Returns nil if the publish rate is not configured at the topic level.
GetPublishRateWithContext(context.Context, utils.TopicName)
(*utils.PublishRateData, error)
// SetPublishRate sets message publish rate for a topic
@@ -645,7 +653,8 @@ type Topics interface {
// RemoveDeduplicationStatusWithContext removes the deduplication
policy for a topic
RemoveDeduplicationStatusWithContext(context.Context, utils.TopicName)
error
- // GetRetention returns the retention configuration for a topic
+ // GetRetention returns the retention configuration for a topic.
+ // Returns nil if the retention policy is not configured at the topic
level.
//
// @param topic
// topicName struct
@@ -654,7 +663,8 @@ type Topics interface {
// in namespace or broker level, if no policy set in topic level
GetRetention(topic utils.TopicName, applied bool)
(*utils.RetentionPolicies, error)
- // GetRetentionWithContext returns the retention configuration for a
topic
+ // GetRetentionWithContext returns the retention configuration for a
topic.
+ // Returns nil if the retention policy is not configured at the topic
level.
//
// @param ctx
// context used for the request
@@ -821,10 +831,12 @@ type Topics interface {
// list of replication cluster id
SetReplicationClustersWithContext(ctx context.Context, topic
utils.TopicName, data []string) error
- // GetSubscribeRate returns subscribe rate configuration for a topic
+ // GetSubscribeRate returns subscribe rate configuration for a topic.
+ // Returns nil if the subscribe rate is not configured at the topic
level.
GetSubscribeRate(utils.TopicName) (*utils.SubscribeRate, error)
- // GetSubscribeRateWithContext returns subscribe rate configuration for
a topic
+ // GetSubscribeRateWithContext returns subscribe rate configuration for
a topic.
+ // Returns nil if the subscribe rate is not configured at the topic
level.
GetSubscribeRateWithContext(context.Context, utils.TopicName)
(*utils.SubscribeRate, error)
// SetSubscribeRate sets subscribe rate configuration for a topic
@@ -839,10 +851,12 @@ type Topics interface {
// RemoveSubscribeRateWithContext removes subscribe rate configuration
for a topic
RemoveSubscribeRateWithContext(context.Context, utils.TopicName) error
- // GetSubscriptionDispatchRate returns subscription dispatch rate for a
topic
+ // GetSubscriptionDispatchRate returns subscription dispatch rate for a
topic.
+ // Returns nil if the subscription dispatch rate is not configured at
the topic level.
GetSubscriptionDispatchRate(utils.TopicName) (*utils.DispatchRateData,
error)
- // GetSubscriptionDispatchRateWithContext returns subscription dispatch
rate for a topic
+ // GetSubscriptionDispatchRateWithContext returns subscription dispatch
rate for a topic.
+ // Returns nil if the subscription dispatch rate is not configured at
the topic level.
GetSubscriptionDispatchRateWithContext(context.Context,
utils.TopicName) (*utils.DispatchRateData, error)
// SetSubscriptionDispatchRate sets subscription dispatch rate for a
topic
@@ -948,10 +962,12 @@ type Topics interface {
// RemoveDeduplicationSnapshotIntervalWithContext removes deduplication
snapshot interval for a topic
RemoveDeduplicationSnapshotIntervalWithContext(context.Context,
utils.TopicName) error
- // GetReplicatorDispatchRate returns replicator dispatch rate for a
topic
+ // GetReplicatorDispatchRate returns replicator dispatch rate for a
topic.
+ // Returns nil if the replicator dispatch rate is not configured at the
topic level.
GetReplicatorDispatchRate(utils.TopicName) (*utils.DispatchRateData,
error)
- // GetReplicatorDispatchRateWithContext returns replicator dispatch
rate for a topic
+ // GetReplicatorDispatchRateWithContext returns replicator dispatch
rate for a topic.
+ // Returns nil if the replicator dispatch rate is not configured at the
topic level.
GetReplicatorDispatchRateWithContext(context.Context, utils.TopicName)
(*utils.DispatchRateData, error)
// SetReplicatorDispatchRate sets replicator dispatch rate for a topic
@@ -966,10 +982,12 @@ type Topics interface {
// RemoveReplicatorDispatchRateWithContext removes replicator dispatch
rate for a topic
RemoveReplicatorDispatchRateWithContext(context.Context,
utils.TopicName) error
- // GetOffloadPolicies returns offload policies for a topic
+ // GetOffloadPolicies returns offload policies for a topic.
+ // Returns nil if the offload policies are not configured at the topic
level.
GetOffloadPolicies(utils.TopicName) (*utils.OffloadPolicies, error)
- // GetOffloadPoliciesWithContext returns offload policies for a topic
+ // GetOffloadPoliciesWithContext returns offload policies for a topic.
+ // Returns nil if the offload policies are not configured at the topic
level.
GetOffloadPoliciesWithContext(context.Context, utils.TopicName)
(*utils.OffloadPolicies, error)
// SetOffloadPolicies sets offload policies for a topic
@@ -984,10 +1002,12 @@ type Topics interface {
// RemoveOffloadPoliciesWithContext removes offload policies for a topic
RemoveOffloadPoliciesWithContext(context.Context, utils.TopicName) error
- // GetAutoSubscriptionCreation returns auto subscription creation
override for a topic
+ // GetAutoSubscriptionCreation returns auto subscription creation
override for a topic.
+ // Returns nil if the auto subscription creation override is not
configured at the topic level.
GetAutoSubscriptionCreation(utils.TopicName)
(*utils.AutoSubscriptionCreationOverride, error)
- // GetAutoSubscriptionCreationWithContext returns auto subscription
creation override for a topic
+ // GetAutoSubscriptionCreationWithContext returns auto subscription
creation override for a topic.
+ // Returns nil if the auto subscription creation override is not
configured at the topic level.
GetAutoSubscriptionCreationWithContext(
context.Context,
utils.TopicName,
@@ -1629,8 +1649,11 @@ func (t *topics) GetPersistence(topic utils.TopicName)
(*utils.PersistenceData,
func (t *topics) GetPersistenceWithContext(ctx context.Context, topic
utils.TopicName) (*utils.PersistenceData, error) {
var persistenceData utils.PersistenceData
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(),
"persistence")
- err := t.pulsar.Client.GetWithContext(ctx, endpoint, &persistenceData)
- return &persistenceData, err
+ body, err := t.pulsar.Client.GetBodyWithContext(ctx, endpoint,
&persistenceData)
+ if body != nil {
+ return &persistenceData, err
+ }
+ return nil, err
}
func (t *topics) SetPersistence(topic utils.TopicName, persistenceData
utils.PersistenceData) error {
@@ -1665,8 +1688,11 @@ func (t *topics) GetDelayedDeliveryWithContext(
) (*utils.DelayedDeliveryData, error) {
var delayedDeliveryData utils.DelayedDeliveryData
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(),
"delayedDelivery")
- err := t.pulsar.Client.GetWithContext(ctx, endpoint,
&delayedDeliveryData)
- return &delayedDeliveryData, err
+ body, err := t.pulsar.Client.GetBodyWithContext(ctx, endpoint,
&delayedDeliveryData)
+ if body != nil {
+ return &delayedDeliveryData, err
+ }
+ return nil, err
}
func (t *topics) SetDelayedDelivery(topic utils.TopicName, delayedDeliveryData
utils.DelayedDeliveryData) error {
@@ -1701,8 +1727,11 @@ func (t *topics) GetDispatchRateWithContext(
) (*utils.DispatchRateData, error) {
var dispatchRateData utils.DispatchRateData
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(),
"dispatchRate")
- err := t.pulsar.Client.GetWithContext(ctx, endpoint, &dispatchRateData)
- return &dispatchRateData, err
+ body, err := t.pulsar.Client.GetBodyWithContext(ctx, endpoint,
&dispatchRateData)
+ if body != nil {
+ return &dispatchRateData, err
+ }
+ return nil, err
}
func (t *topics) SetDispatchRate(topic utils.TopicName, dispatchRateData
utils.DispatchRateData) error {
@@ -1734,8 +1763,11 @@ func (t *topics) GetPublishRate(topic utils.TopicName)
(*utils.PublishRateData,
func (t *topics) GetPublishRateWithContext(ctx context.Context, topic
utils.TopicName) (*utils.PublishRateData, error) {
var publishRateData utils.PublishRateData
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(),
"publishRate")
- err := t.pulsar.Client.GetWithContext(ctx, endpoint, &publishRateData)
- return &publishRateData, err
+ body, err := t.pulsar.Client.GetBodyWithContext(ctx, endpoint,
&publishRateData)
+ if body != nil {
+ return &publishRateData, err
+ }
+ return nil, err
}
func (t *topics) SetPublishRate(topic utils.TopicName, publishRateData
utils.PublishRateData) error {
@@ -1800,10 +1832,13 @@ func (t *topics) GetRetentionWithContext(
) (*utils.RetentionPolicies, error) {
var policy utils.RetentionPolicies
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(),
"retention")
- _, err := t.pulsar.Client.GetWithQueryParamsWithContext(ctx, endpoint,
&policy, map[string]string{
+ body, err := t.pulsar.Client.GetWithQueryParamsWithContext(ctx,
endpoint, &policy, map[string]string{
"applied": strconv.FormatBool(applied),
}, true)
- return &policy, err
+ if body != nil {
+ return &policy, err
+ }
+ return nil, err
}
func (t *topics) RemoveRetention(topic utils.TopicName) error {
@@ -1979,8 +2014,11 @@ func (t *topics) GetSubscribeRate(topic utils.TopicName)
(*utils.SubscribeRate,
func (t *topics) GetSubscribeRateWithContext(ctx context.Context, topic
utils.TopicName) (*utils.SubscribeRate, error) {
var subscribeRate utils.SubscribeRate
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(),
"subscribeRate")
- err := t.pulsar.Client.GetWithContext(ctx, endpoint, &subscribeRate)
- return &subscribeRate, err
+ body, err := t.pulsar.Client.GetBodyWithContext(ctx, endpoint,
&subscribeRate)
+ if body != nil {
+ return &subscribeRate, err
+ }
+ return nil, err
}
func (t *topics) SetSubscribeRate(topic utils.TopicName, subscribeRate
utils.SubscribeRate) error {
@@ -2015,8 +2053,11 @@ func (t *topics) GetSubscriptionDispatchRateWithContext(
) (*utils.DispatchRateData, error) {
var dispatchRate utils.DispatchRateData
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(),
"subscriptionDispatchRate")
- err := t.pulsar.Client.GetWithContext(ctx, endpoint, &dispatchRate)
- return &dispatchRate, err
+ body, err := t.pulsar.Client.GetBodyWithContext(ctx, endpoint,
&dispatchRate)
+ if body != nil {
+ return &dispatchRate, err
+ }
+ return nil, err
}
func (t *topics) SetSubscriptionDispatchRate(topic utils.TopicName,
dispatchRate utils.DispatchRateData) error {
@@ -2212,8 +2253,11 @@ func (t *topics) GetReplicatorDispatchRateWithContext(
) (*utils.DispatchRateData, error) {
var dispatchRate utils.DispatchRateData
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(),
"replicatorDispatchRate")
- err := t.pulsar.Client.GetWithContext(ctx, endpoint, &dispatchRate)
- return &dispatchRate, err
+ body, err := t.pulsar.Client.GetBodyWithContext(ctx, endpoint,
&dispatchRate)
+ if body != nil {
+ return &dispatchRate, err
+ }
+ return nil, err
}
func (t *topics) SetReplicatorDispatchRate(topic utils.TopicName, dispatchRate
utils.DispatchRateData) error {
@@ -2248,8 +2292,11 @@ func (t *topics) GetAutoSubscriptionCreationWithContext(
) (*utils.AutoSubscriptionCreationOverride, error) {
var autoSubCreation utils.AutoSubscriptionCreationOverride
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(),
"autoSubscriptionCreation")
- err := t.pulsar.Client.GetWithContext(ctx, endpoint, &autoSubCreation)
- return &autoSubCreation, err
+ body, err := t.pulsar.Client.GetBodyWithContext(ctx, endpoint,
&autoSubCreation)
+ if body != nil {
+ return &autoSubCreation, err
+ }
+ return nil, err
}
func (t *topics) SetAutoSubscriptionCreation(topic utils.TopicName,
@@ -2316,8 +2363,11 @@ func (t *topics) GetOffloadPoliciesWithContext(
) (*utils.OffloadPolicies, error) {
var offloadPolicies utils.OffloadPolicies
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(),
"offloadPolicies")
- err := t.pulsar.Client.GetWithContext(ctx, endpoint, &offloadPolicies)
- return &offloadPolicies, err
+ body, err := t.pulsar.Client.GetBodyWithContext(ctx, endpoint,
&offloadPolicies)
+ if body != nil {
+ return &offloadPolicies, err
+ }
+ return nil, err
}
func (t *topics) SetOffloadPolicies(topic utils.TopicName, offloadPolicies
utils.OffloadPolicies) error {
diff --git a/pulsaradmin/pkg/admin/topic_test.go
b/pulsaradmin/pkg/admin/topic_test.go
index 8493cf76..f40c0bbc 100644
--- a/pulsaradmin/pkg/admin/topic_test.go
+++ b/pulsaradmin/pkg/admin/topic_test.go
@@ -516,10 +516,10 @@ func TestRetention(t *testing.T) {
err = admin.Topics().Create(*topicName, 4)
assert.NoError(t, err)
+ // Initial state: policy not configured, should return nil
topicRetentionPolicy, err := admin.Topics().GetRetention(*topicName,
false)
assert.NoError(t, err)
- assert.Equal(t, int64(0), topicRetentionPolicy.RetentionSizeInMB)
- assert.Equal(t, 0, topicRetentionPolicy.RetentionTimeInMinutes)
+ assert.Nil(t, topicRetentionPolicy, "Expected nil when retention policy
is not configured")
err = admin.Topics().SetRetention(*topicName, utils.RetentionPolicies{
RetentionSizeInMB: 20480,
RetentionTimeInMinutes: 1440,
@@ -540,13 +540,12 @@ func TestRetention(t *testing.T) {
)
err = admin.Topics().RemoveRetention(*topicName)
assert.NoError(t, err)
+ // After removal, should return nil (not configured)
assert.Eventually(
t,
func() bool {
topicRetentionPolicy, err =
admin.Topics().GetRetention(*topicName, false)
- return err == nil &&
- topicRetentionPolicy.RetentionSizeInMB ==
int64(0) &&
- topicRetentionPolicy.RetentionTimeInMinutes == 0
+ return err == nil && topicRetentionPolicy == nil
},
10*time.Second,
100*time.Millisecond,
@@ -565,12 +564,10 @@ func TestSubscribeRate(t *testing.T) {
err = admin.Topics().Create(*topicName, 4)
assert.NoError(t, err)
- // Get default subscribe rate (adapt to actual server behavior)
+ // Initial state: policy not configured, should return nil
initialSubscribeRate, err := admin.Topics().GetSubscribeRate(*topicName)
assert.NoError(t, err)
- // Store initial values for later comparison instead of assuming
specific defaults
- initialConsumerRate :=
initialSubscribeRate.SubscribeThrottlingRatePerConsumer
- initialRatePeriod := initialSubscribeRate.RatePeriodInSecond
+ assert.Nil(t, initialSubscribeRate, "Expected nil when subscribe rate
is not configured")
// Set new subscribe rate
newSubscribeRate := utils.SubscribeRate{
@@ -594,16 +591,14 @@ func TestSubscribeRate(t *testing.T) {
100*time.Millisecond,
)
- // Remove subscribe rate policy
+ // Remove subscribe rate policy - should return nil
err = admin.Topics().RemoveSubscribeRate(*topicName)
assert.NoError(t, err)
assert.Eventually(
t,
func() bool {
subscribeRate, err :=
admin.Topics().GetSubscribeRate(*topicName)
- return err == nil &&
-
subscribeRate.SubscribeThrottlingRatePerConsumer == initialConsumerRate &&
- subscribeRate.RatePeriodInSecond ==
initialRatePeriod
+ return err == nil && subscribeRate == nil
},
10*time.Second,
100*time.Millisecond,
@@ -622,14 +617,10 @@ func TestSubscriptionDispatchRate(t *testing.T) {
err = admin.Topics().Create(*topicName, 4)
assert.NoError(t, err)
- // Get default subscription dispatch rate (adapt to actual server
behavior)
+ // Initial state: policy not configured, should return nil
initialDispatchRate, err :=
admin.Topics().GetSubscriptionDispatchRate(*topicName)
assert.NoError(t, err)
- // Store initial values for later comparison instead of assuming
specific defaults
- initialMsgRate := initialDispatchRate.DispatchThrottlingRateInMsg
- initialByteRate := initialDispatchRate.DispatchThrottlingRateInByte
- initialRatePeriod := initialDispatchRate.RatePeriodInSecond
- initialRelativeToPublish := initialDispatchRate.RelativeToPublishRate
+ assert.Nil(t, initialDispatchRate, "Expected nil when subscription
dispatch rate is not configured")
// Set new subscription dispatch rate
newDispatchRate := utils.DispatchRateData{
@@ -657,18 +648,14 @@ func TestSubscriptionDispatchRate(t *testing.T) {
100*time.Millisecond,
)
- // Remove subscription dispatch rate policy
+ // Remove subscription dispatch rate policy - should return nil
err = admin.Topics().RemoveSubscriptionDispatchRate(*topicName)
assert.NoError(t, err)
assert.Eventually(
t,
func() bool {
dispatchRate, err :=
admin.Topics().GetSubscriptionDispatchRate(*topicName)
- return err == nil &&
- dispatchRate.DispatchThrottlingRateInMsg ==
initialMsgRate &&
- dispatchRate.DispatchThrottlingRateInByte ==
initialByteRate &&
- dispatchRate.RatePeriodInSecond ==
initialRatePeriod &&
- dispatchRate.RelativeToPublishRate ==
initialRelativeToPublish
+ return err == nil && dispatchRate == nil
},
10*time.Second,
100*time.Millisecond,
@@ -943,14 +930,10 @@ func TestReplicatorDispatchRate(t *testing.T) {
err = admin.Topics().Create(*topicName, 4)
assert.NoError(t, err)
- // Get default replicator dispatch rate (adapt to actual server
behavior)
+ // Initial state: policy not configured, should return nil
initialDispatchRate, err :=
admin.Topics().GetReplicatorDispatchRate(*topicName)
assert.NoError(t, err)
- // Store initial values for later comparison instead of assuming
specific defaults
- initialMsgRate := initialDispatchRate.DispatchThrottlingRateInMsg
- initialByteRate := initialDispatchRate.DispatchThrottlingRateInByte
- initialRatePeriod := initialDispatchRate.RatePeriodInSecond
- initialRelativeToPublish := initialDispatchRate.RelativeToPublishRate
+ assert.Nil(t, initialDispatchRate, "Expected nil when replicator
dispatch rate is not configured")
// Set new replicator dispatch rate
newDispatchRate := utils.DispatchRateData{
@@ -978,18 +961,14 @@ func TestReplicatorDispatchRate(t *testing.T) {
100*time.Millisecond,
)
- // Remove replicator dispatch rate policy
+ // Remove replicator dispatch rate policy - should return nil
err = admin.Topics().RemoveReplicatorDispatchRate(*topicName)
assert.NoError(t, err)
assert.Eventually(
t,
func() bool {
dispatchRate, err :=
admin.Topics().GetReplicatorDispatchRate(*topicName)
- return err == nil &&
- dispatchRate.DispatchThrottlingRateInMsg ==
initialMsgRate &&
- dispatchRate.DispatchThrottlingRateInByte ==
initialByteRate &&
- dispatchRate.RatePeriodInSecond ==
initialRatePeriod &&
- dispatchRate.RelativeToPublishRate ==
initialRelativeToPublish
+ return err == nil && dispatchRate == nil
},
10*time.Second,
100*time.Millisecond,
@@ -1008,12 +987,10 @@ func TestOffloadPolicies(t *testing.T) {
err = admin.Topics().Create(*topicName, 4)
assert.NoError(t, err)
- // Get default offload policies
+ // Initial state: policy not configured, should return nil
offloadPolicies, err := admin.Topics().GetOffloadPolicies(*topicName)
assert.NoError(t, err)
- // Default values should be empty/default
- assert.Equal(t, "", offloadPolicies.ManagedLedgerOffloadDriver)
- assert.Equal(t, 0, offloadPolicies.ManagedLedgerOffloadMaxThreads)
+ assert.Nil(t, offloadPolicies, "Expected nil when offload policies are
not configured")
// Set new offload policies
newOffloadPolicies := utils.OffloadPolicies{
@@ -1045,16 +1022,14 @@ func TestOffloadPolicies(t *testing.T) {
100*time.Millisecond,
)
- // Remove offload policies
+ // Remove offload policies - should return nil
err = admin.Topics().RemoveOffloadPolicies(*topicName)
assert.NoError(t, err)
assert.Eventually(
t,
func() bool {
offloadPolicies, err =
admin.Topics().GetOffloadPolicies(*topicName)
- return err == nil &&
- offloadPolicies.ManagedLedgerOffloadDriver ==
"" &&
- offloadPolicies.ManagedLedgerOffloadMaxThreads
== 0
+ return err == nil && offloadPolicies == nil
},
10*time.Second,
100*time.Millisecond,
@@ -1073,10 +1048,10 @@ func TestAutoSubscriptionCreation(t *testing.T) {
err = admin.Topics().Create(*topicName, 4)
assert.NoError(t, err)
- // Get default auto subscription creation
+ // Initial state: policy not configured, should return nil
autoSubCreation, err :=
admin.Topics().GetAutoSubscriptionCreation(*topicName)
assert.NoError(t, err)
- assert.Equal(t, false, autoSubCreation.AllowAutoSubscriptionCreation)
+ assert.Nil(t, autoSubCreation, "Expected nil when auto subscription
creation is not configured")
// Set auto subscription creation to true
newAutoSubCreation := utils.AutoSubscriptionCreationOverride{
@@ -1098,15 +1073,14 @@ func TestAutoSubscriptionCreation(t *testing.T) {
100*time.Millisecond,
)
- // Remove auto subscription creation policy
+ // Remove auto subscription creation policy - should return nil
err = admin.Topics().RemoveAutoSubscriptionCreation(*topicName)
assert.NoError(t, err)
assert.Eventually(
t,
func() bool {
autoSubCreation, err =
admin.Topics().GetAutoSubscriptionCreation(*topicName)
- return err == nil &&
- autoSubCreation.AllowAutoSubscriptionCreation
== false
+ return err == nil && autoSubCreation == nil
},
10*time.Second,
100*time.Millisecond,
@@ -1369,3 +1343,214 @@ func TestTopics_MaxUnackMessagesPerSubscription(t
*testing.T) {
err = admin.Topics().RemoveMaxUnackMessagesPerSubscription(*topicName)
assert.NoError(t, err)
}
+
+func TestTopics_Persistence(t *testing.T) {
+ randomName := newTopicName()
+ topic := "persistent://public/default/" + randomName
+ cfg := &config.Config{}
+ admin, err := New(cfg)
+ assert.NoError(t, err)
+ assert.NotNil(t, admin)
+ topicName, err := utils.GetTopicName(topic)
+ assert.NoError(t, err)
+ err = admin.Topics().Create(*topicName, 4)
+ assert.NoError(t, err)
+
+ // Initial state: policy not configured, should return nil
+ persistence, err := admin.Topics().GetPersistence(*topicName)
+ assert.NoError(t, err)
+ assert.Nil(t, persistence, "Expected nil when persistence is not
configured")
+
+ // Set new persistence policy
+ newPersistence := utils.PersistenceData{
+ BookkeeperEnsemble: 1,
+ BookkeeperWriteQuorum: 1,
+ BookkeeperAckQuorum: 1,
+ ManagedLedgerMaxMarkDeleteRate: 10.0,
+ }
+ err = admin.Topics().SetPersistence(*topicName, newPersistence)
+ assert.NoError(t, err)
+
+ // Verify persistence is set
+ assert.Eventually(
+ t,
+ func() bool {
+ persistence, err =
admin.Topics().GetPersistence(*topicName)
+ return err == nil && persistence != nil &&
+ persistence.BookkeeperEnsemble == 1 &&
+ persistence.BookkeeperWriteQuorum == 1
+ },
+ 10*time.Second,
+ 100*time.Millisecond,
+ )
+
+ // Remove persistence policy - should return nil
+ err = admin.Topics().RemovePersistence(*topicName)
+ assert.NoError(t, err)
+ assert.Eventually(
+ t,
+ func() bool {
+ persistence, err =
admin.Topics().GetPersistence(*topicName)
+ return err == nil && persistence == nil
+ },
+ 10*time.Second,
+ 100*time.Millisecond,
+ )
+}
+
+func TestTopics_DelayedDelivery(t *testing.T) {
+ randomName := newTopicName()
+ topic := "persistent://public/default/" + randomName
+ cfg := &config.Config{}
+ admin, err := New(cfg)
+ assert.NoError(t, err)
+ assert.NotNil(t, admin)
+ topicName, err := utils.GetTopicName(topic)
+ assert.NoError(t, err)
+ err = admin.Topics().Create(*topicName, 4)
+ assert.NoError(t, err)
+
+ // Initial state: policy not configured, should return nil
+ delayedDelivery, err := admin.Topics().GetDelayedDelivery(*topicName)
+ assert.NoError(t, err)
+ assert.Nil(t, delayedDelivery, "Expected nil when delayed delivery is
not configured")
+
+ // Set new delayed delivery policy
+ newDelayedDelivery := utils.DelayedDeliveryData{
+ Active: true,
+ TickTime: 1000,
+ MaxDelayInMillis: 60000,
+ }
+ err = admin.Topics().SetDelayedDelivery(*topicName, newDelayedDelivery)
+ assert.NoError(t, err)
+
+ // Verify delayed delivery is set
+ assert.Eventually(
+ t,
+ func() bool {
+ delayedDelivery, err =
admin.Topics().GetDelayedDelivery(*topicName)
+ return err == nil && delayedDelivery != nil &&
+ delayedDelivery.Active == true &&
+ delayedDelivery.TickTime == 1000
+ },
+ 10*time.Second,
+ 100*time.Millisecond,
+ )
+
+ // Remove delayed delivery policy - should return nil
+ err = admin.Topics().RemoveDelayedDelivery(*topicName)
+ assert.NoError(t, err)
+ assert.Eventually(
+ t,
+ func() bool {
+ delayedDelivery, err =
admin.Topics().GetDelayedDelivery(*topicName)
+ return err == nil && delayedDelivery == nil
+ },
+ 10*time.Second,
+ 100*time.Millisecond,
+ )
+}
+
+func TestTopics_DispatchRate(t *testing.T) {
+ randomName := newTopicName()
+ topic := "persistent://public/default/" + randomName
+ cfg := &config.Config{}
+ admin, err := New(cfg)
+ assert.NoError(t, err)
+ assert.NotNil(t, admin)
+ topicName, err := utils.GetTopicName(topic)
+ assert.NoError(t, err)
+ err = admin.Topics().Create(*topicName, 4)
+ assert.NoError(t, err)
+
+ // Initial state: policy not configured, should return nil
+ dispatchRate, err := admin.Topics().GetDispatchRate(*topicName)
+ assert.NoError(t, err)
+ assert.Nil(t, dispatchRate, "Expected nil when dispatch rate is not
configured")
+
+ // Set new dispatch rate
+ newDispatchRate := utils.DispatchRateData{
+ DispatchThrottlingRateInMsg: 100,
+ DispatchThrottlingRateInByte: 1048576,
+ RatePeriodInSecond: 60,
+ RelativeToPublishRate: false,
+ }
+ err = admin.Topics().SetDispatchRate(*topicName, newDispatchRate)
+ assert.NoError(t, err)
+
+ // Verify dispatch rate is set
+ assert.Eventually(
+ t,
+ func() bool {
+ dispatchRate, err =
admin.Topics().GetDispatchRate(*topicName)
+ return err == nil && dispatchRate != nil &&
+ dispatchRate.DispatchThrottlingRateInMsg == 100
+ },
+ 10*time.Second,
+ 100*time.Millisecond,
+ )
+
+ // Remove dispatch rate policy - should return nil
+ err = admin.Topics().RemoveDispatchRate(*topicName)
+ assert.NoError(t, err)
+ assert.Eventually(
+ t,
+ func() bool {
+ dispatchRate, err =
admin.Topics().GetDispatchRate(*topicName)
+ return err == nil && dispatchRate == nil
+ },
+ 10*time.Second,
+ 100*time.Millisecond,
+ )
+}
+
+func TestTopics_PublishRate(t *testing.T) {
+ randomName := newTopicName()
+ topic := "persistent://public/default/" + randomName
+ cfg := &config.Config{}
+ admin, err := New(cfg)
+ assert.NoError(t, err)
+ assert.NotNil(t, admin)
+ topicName, err := utils.GetTopicName(topic)
+ assert.NoError(t, err)
+ err = admin.Topics().Create(*topicName, 4)
+ assert.NoError(t, err)
+
+ // Initial state: policy not configured, should return nil
+ publishRate, err := admin.Topics().GetPublishRate(*topicName)
+ assert.NoError(t, err)
+ assert.Nil(t, publishRate, "Expected nil when publish rate is not
configured")
+
+ // Set new publish rate
+ newPublishRate := utils.PublishRateData{
+ PublishThrottlingRateInMsg: 200,
+ PublishThrottlingRateInByte: 2097152,
+ }
+ err = admin.Topics().SetPublishRate(*topicName, newPublishRate)
+ assert.NoError(t, err)
+
+ // Verify publish rate is set
+ assert.Eventually(
+ t,
+ func() bool {
+ publishRate, err =
admin.Topics().GetPublishRate(*topicName)
+ return err == nil && publishRate != nil &&
+ publishRate.PublishThrottlingRateInMsg == 200
+ },
+ 10*time.Second,
+ 100*time.Millisecond,
+ )
+
+ // Remove publish rate policy - should return nil
+ err = admin.Topics().RemovePublishRate(*topicName)
+ assert.NoError(t, err)
+ assert.Eventually(
+ t,
+ func() bool {
+ publishRate, err =
admin.Topics().GetPublishRate(*topicName)
+ return err == nil && publishRate == nil
+ },
+ 10*time.Second,
+ 100*time.Millisecond,
+ )
+}
diff --git a/pulsaradmin/pkg/rest/client.go b/pulsaradmin/pkg/rest/client.go
index ffb0c29a..82e460b1 100644
--- a/pulsaradmin/pkg/rest/client.go
+++ b/pulsaradmin/pkg/rest/client.go
@@ -141,6 +141,10 @@ func (c *Client) GetWithContext(ctx context.Context,
endpoint string, obj interf
return err
}
+func (c *Client) GetBodyWithContext(ctx context.Context, endpoint string, obj
interface{}) ([]byte, error) {
+ return c.GetWithQueryParamsWithContext(ctx, endpoint, obj, nil, true)
+}
+
func (c *Client) GetWithQueryParams(endpoint string, obj interface{}, params
map[string]string,
decode bool) ([]byte, error) {
return c.GetWithQueryParamsWithContext(context.Background(), endpoint,
obj, params, decode)
@@ -190,12 +194,14 @@ func (c *Client) GetWithOptionsWithContext(
defer safeRespClose(resp)
if obj != nil {
- if err := decodeJSONBody(resp, &obj); err != nil {
+ body, err := decodeJSONWithBody(resp, &obj)
+ if err != nil {
if err == io.EOF {
return nil, nil
}
return nil, err
}
+ return body, nil
} else if !decode {
if file != nil {
_, err := io.Copy(file, resp.Body)
@@ -533,6 +539,25 @@ func decodeJSONBody(resp *http.Response, out interface{})
error {
return dec.Decode(out)
}
+// decodeJSONWithBody is used to JSON decode a body AND ALSO return the raw
body bytes
+func decodeJSONWithBody(resp *http.Response, out interface{}) ([]byte, error) {
+ // Read the body first so we can return it even after decoding
+ body, err := io.ReadAll(resp.Body)
+ if err != nil {
+ return nil, err
+ }
+
+ if len(body) == 0 {
+ return nil, nil
+ }
+
+ if err := json.Unmarshal(body, &out); err != nil {
+ return nil, err
+ }
+
+ return body, nil
+}
+
// safeRespClose is used to close a response body
func safeRespClose(resp *http.Response) {
if resp != nil {