This is an automated email from the ASF dual-hosted git repository.

crossoverjie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 50efe422 Update namespace & topic admin methods to return nil if unset 
(#1433)
50efe422 is described below

commit 50efe422e74f98ddf93b2dbf124cb316ec2d92e2
Author: Kai <[email protected]>
AuthorDate: Thu Oct 30 01:06:18 2025 -0700

    Update namespace & topic admin methods to return nil if unset (#1433)
    
    Co-authored-by: Copilot <[email protected]>
    Co-authored-by: crossoverJie <[email protected]>
---
 pulsaradmin/pkg/admin/namespace.go      |  52 ++++--
 pulsaradmin/pkg/admin/namespace_test.go | 111 ++++++++++++-
 pulsaradmin/pkg/admin/topic.go          | 130 ++++++++++-----
 pulsaradmin/pkg/admin/topic_test.go     | 285 ++++++++++++++++++++++++++------
 pulsaradmin/pkg/rest/client.go          |  27 ++-
 5 files changed, 492 insertions(+), 113 deletions(-)

diff --git a/pulsaradmin/pkg/admin/namespace.go 
b/pulsaradmin/pkg/admin/namespace.go
index 6114f5ce..d5d33f12 100644
--- a/pulsaradmin/pkg/admin/namespace.go
+++ b/pulsaradmin/pkg/admin/namespace.go
@@ -94,10 +94,12 @@ type Namespaces interface {
        // GetNamespaceMessageTTLWithContext returns the message TTL for a 
namespace. Returns -1 if not set
        GetNamespaceMessageTTLWithContext(ctx context.Context, namespace 
string) (int, error)
 
-       // GetRetention returns the retention configuration for a namespace
+       // GetRetention returns the retention configuration for a namespace.
+       // Returns nil if the retention policy is not configured at the 
namespace level.
        GetRetention(namespace string) (*utils.RetentionPolicies, error)
 
-       // GetRetentionWithContext returns the retention configuration for a 
namespace
+       // GetRetentionWithContext returns the retention configuration for a 
namespace.
+       // Returns nil if the retention policy is not configured at the 
namespace level.
        GetRetentionWithContext(ctx context.Context, namespace string) 
(*utils.RetentionPolicies, error)
 
        // SetRetention sets the retention configuration for all the topics on 
a namespace
@@ -132,10 +134,12 @@ type Namespaces interface {
        // RemoveBacklogQuotaWithContext removes a backlog quota policy from a 
namespace
        RemoveBacklogQuotaWithContext(ctx context.Context, namespace string) 
error
 
-       // GetTopicAutoCreation returns the topic auto-creation config for a 
namespace
+       // GetTopicAutoCreation returns the topic auto-creation config for a 
namespace.
+       // Returns nil if the topic auto-creation config is not configured at 
the namespace level.
        GetTopicAutoCreation(namespace utils.NameSpaceName) 
(*utils.TopicAutoCreationConfig, error)
 
-       // GetTopicAutoCreationWithContext returns the topic auto-creation 
config for a namespace
+       // GetTopicAutoCreationWithContext returns the topic auto-creation 
config for a namespace.
+       // Returns nil if the topic auto-creation config is not configured at 
the namespace level.
        GetTopicAutoCreationWithContext(
                ctx context.Context,
                namespace utils.NameSpaceName,
@@ -392,10 +396,12 @@ type Namespaces interface {
        // SetPersistenceWithContext sets the persistence configuration for all 
the topics on a namespace
        SetPersistenceWithContext(ctx context.Context, namespace string, 
persistence utils.PersistencePolicies) error
 
-       // GetPersistence returns the persistence configuration for a namespace
+       // GetPersistence returns the persistence configuration for a namespace.
+       // Returns nil if the persistence policy is not configured at the 
namespace level.
        GetPersistence(namespace string) (*utils.PersistencePolicies, error)
 
-       // GetPersistenceWithContext returns the persistence configuration for 
a namespace
+       // GetPersistenceWithContext returns the persistence configuration for 
a namespace.
+       // Returns nil if the persistence policy is not configured at the 
namespace level.
        GetPersistenceWithContext(ctx context.Context, namespace string) 
(*utils.PersistencePolicies, error)
 
        // SetBookieAffinityGroup sets bookie affinity group for a namespace to 
isolate namespace write to bookies that are
@@ -416,10 +422,12 @@ type Namespaces interface {
        // DeleteBookieAffinityGroupWithContext deletes bookie affinity group 
configured for a namespace
        DeleteBookieAffinityGroupWithContext(ctx context.Context, namespace 
string) error
 
-       // GetBookieAffinityGroup returns bookie affinity group configured for 
a namespace
+       // GetBookieAffinityGroup returns bookie affinity group configured for 
a namespace.
+       // Returns nil if the bookie affinity group is not configured at the 
namespace level.
        GetBookieAffinityGroup(namespace string) 
(*utils.BookieAffinityGroupData, error)
 
-       // GetBookieAffinityGroupWithContext returns bookie affinity group 
configured for a namespace
+       // GetBookieAffinityGroupWithContext returns bookie affinity group 
configured for a namespace.
+       // Returns nil if the bookie affinity group is not configured at the 
namespace level.
        GetBookieAffinityGroupWithContext(ctx context.Context, namespace 
string) (*utils.BookieAffinityGroupData, error)
 
        // Unload a namespace from the current serving broker
@@ -905,8 +913,11 @@ func (n *namespaces) GetRetentionWithContext(ctx 
context.Context, namespace stri
                return nil, err
        }
        endpoint := n.pulsar.endpoint(n.basePath, nsName.String(), "retention")
-       err = n.pulsar.Client.GetWithContext(ctx, endpoint, &policy)
-       return &policy, err
+       body, err := n.pulsar.Client.GetBodyWithContext(ctx, endpoint, &policy)
+       if body != nil {
+               return &policy, err
+       }
+       return nil, err
 }
 
 func (n *namespaces) GetBacklogQuotaMap(namespace string) 
(map[utils.BacklogQuotaType]utils.BacklogQuota, error) {
@@ -970,8 +981,11 @@ func (n *namespaces) GetTopicAutoCreationWithContext(
 ) (*utils.TopicAutoCreationConfig, error) {
        var topicAutoCreation utils.TopicAutoCreationConfig
        endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), 
"autoTopicCreation")
-       err := n.pulsar.Client.GetWithContext(ctx, endpoint, &topicAutoCreation)
-       return &topicAutoCreation, err
+       body, err := n.pulsar.Client.GetBodyWithContext(ctx, endpoint, 
&topicAutoCreation)
+       if body != nil {
+               return &topicAutoCreation, err
+       }
+       return nil, err
 }
 
 func (n *namespaces) SetTopicAutoCreation(namespace utils.NameSpaceName, 
config utils.TopicAutoCreationConfig) error {
@@ -1463,8 +1477,11 @@ func (n *namespaces) GetBookieAffinityGroupWithContext(
                return nil, err
        }
        endpoint := n.pulsar.endpoint(n.basePath, nsName.String(), 
"persistence", "bookieAffinity")
-       err = n.pulsar.Client.GetWithContext(ctx, endpoint, &data)
-       return &data, err
+       body, err := n.pulsar.Client.GetBodyWithContext(ctx, endpoint, &data)
+       if body != nil {
+               return &data, err
+       }
+       return nil, err
 }
 
 func (n *namespaces) GetPersistence(namespace string) 
(*utils.PersistencePolicies, error) {
@@ -1481,8 +1498,11 @@ func (n *namespaces) GetPersistenceWithContext(
                return nil, err
        }
        endpoint := n.pulsar.endpoint(n.basePath, nsName.String(), 
"persistence")
-       err = n.pulsar.Client.GetWithContext(ctx, endpoint, &persistence)
-       return &persistence, err
+       body, err := n.pulsar.Client.GetBodyWithContext(ctx, endpoint, 
&persistence)
+       if body != nil {
+               return &persistence, err
+       }
+       return nil, err
 }
 
 func (n *namespaces) Unload(namespace string) error {
diff --git a/pulsaradmin/pkg/admin/namespace_test.go 
b/pulsaradmin/pkg/admin/namespace_test.go
index cfa44951..1ca235f5 100644
--- a/pulsaradmin/pkg/admin/namespace_test.go
+++ b/pulsaradmin/pkg/admin/namespace_test.go
@@ -18,6 +18,7 @@
 package admin
 
 import (
+       "os"
        "testing"
        "time"
 
@@ -156,23 +157,20 @@ func TestGetTopicAutoCreation(t *testing.T) {
        assert.Equal(t, nil, err)
        topicAutoCreation, err := 
admin.Namespaces().GetTopicAutoCreation(*namespace)
        assert.Equal(t, nil, err)
+       assert.NotNil(t, topicAutoCreation, "Expected non-nil when topic auto 
creation is configured")
        expected := utils.TopicAutoCreationConfig{
                Allow: true,
                Type:  utils.NonPartitioned,
        }
        assert.Equal(t, expected, *topicAutoCreation)
 
-       // remove the topic auto creation config and get it
+       // remove the topic auto creation config and get it - should return nil
        err = admin.Namespaces().RemoveTopicAutoCreation(*namespace)
        assert.Equal(t, nil, err)
 
        topicAutoCreation, err = 
admin.Namespaces().GetTopicAutoCreation(*namespace)
        assert.Equal(t, nil, err)
-       expected = utils.TopicAutoCreationConfig{
-               Allow: false,
-               Type:  "",
-       }
-       assert.Equal(t, expected, *topicAutoCreation)
+       assert.Nil(t, topicAutoCreation, "Expected nil when topic auto creation 
is not configured")
 }
 
 func TestRevokeSubPermission(t *testing.T) {
@@ -763,3 +761,104 @@ func TestNamespaces_MaxProducersPerTopic(t *testing.T) {
        assert.NoError(t, err)
        assert.Equal(t, 50, maxProducers)
 }
+
+func TestNamespaces_Retention(t *testing.T) {
+       config := &config.Config{}
+       admin, err := New(config)
+       require.NoError(t, err)
+       require.NotNil(t, admin)
+
+       namespaceName := "public/default"
+
+       // Initial state: policy not configured, should return nil
+       retention, err := admin.Namespaces().GetRetention(namespaceName)
+       assert.NoError(t, err)
+       assert.Nil(t, retention, "Expected nil when retention is not 
configured")
+
+       // Set new retention policy
+       newRetention := utils.RetentionPolicies{
+               RetentionSizeInMB:      1024,
+               RetentionTimeInMinutes: 60,
+       }
+       err = admin.Namespaces().SetRetention(namespaceName, newRetention)
+       assert.NoError(t, err)
+
+       // Verify retention is set
+       retention, err = admin.Namespaces().GetRetention(namespaceName)
+       assert.NoError(t, err)
+       assert.NotNil(t, retention, "Expected non-nil when retention is 
configured")
+       assert.Equal(t, int64(1024), retention.RetentionSizeInMB)
+       assert.Equal(t, 60, retention.RetentionTimeInMinutes)
+}
+
+func TestNamespaces_BookieAffinityGroup(t *testing.T) {
+       readFile, err := 
os.ReadFile("../../../integration-tests/tokens/admin-token")
+       require.NoError(t, err)
+
+       config := &config.Config{
+               Token: string(readFile),
+       }
+       admin, err := New(config)
+       require.NoError(t, err)
+       require.NotNil(t, admin)
+
+       namespaceName := "public/default"
+
+       // Initial state: policy not configured, should return nil
+       bookieAffinity, err := 
admin.Namespaces().GetBookieAffinityGroup(namespaceName)
+       assert.NoError(t, err)
+       assert.Nil(t, bookieAffinity, "Expected nil when bookie affinity group 
is not configured")
+
+       // Set new bookie affinity group
+       newBookieAffinity := utils.BookieAffinityGroupData{
+               BookkeeperAffinityGroupPrimary:   "primary-group",
+               BookkeeperAffinityGroupSecondary: "secondary-group",
+       }
+       err = admin.Namespaces().SetBookieAffinityGroup(namespaceName, 
newBookieAffinity)
+       assert.NoError(t, err)
+
+       // Verify bookie affinity group is set
+       bookieAffinity, err = 
admin.Namespaces().GetBookieAffinityGroup(namespaceName)
+       assert.NoError(t, err)
+       assert.NotNil(t, bookieAffinity, "Expected non-nil when bookie affinity 
group is configured")
+       assert.Equal(t, "primary-group", 
bookieAffinity.BookkeeperAffinityGroupPrimary)
+       assert.Equal(t, "secondary-group", 
bookieAffinity.BookkeeperAffinityGroupSecondary)
+
+       // Remove bookie affinity group - should return nil
+       err = admin.Namespaces().DeleteBookieAffinityGroup(namespaceName)
+       assert.NoError(t, err)
+       bookieAffinity, err = 
admin.Namespaces().GetBookieAffinityGroup(namespaceName)
+       assert.NoError(t, err)
+       assert.Nil(t, bookieAffinity, "Expected nil after removing bookie 
affinity group")
+}
+
+func TestNamespaces_Persistence(t *testing.T) {
+       config := &config.Config{}
+       admin, err := New(config)
+       require.NoError(t, err)
+       require.NotNil(t, admin)
+
+       namespaceName := "public/default"
+
+       // Initial state: policy not configured, should return nil
+       persistence, err := admin.Namespaces().GetPersistence(namespaceName)
+       assert.NoError(t, err)
+       assert.Nil(t, persistence, "Expected nil when persistence is not 
configured")
+
+       // Set new persistence policy
+       newPersistence := utils.PersistencePolicies{
+               BookkeeperEnsemble:             1,
+               BookkeeperWriteQuorum:          1,
+               BookkeeperAckQuorum:            1,
+               ManagedLedgerMaxMarkDeleteRate: 10.0,
+       }
+       err = admin.Namespaces().SetPersistence(namespaceName, newPersistence)
+       assert.NoError(t, err)
+
+       // Verify persistence is set
+       persistence, err = admin.Namespaces().GetPersistence(namespaceName)
+       assert.NoError(t, err)
+       assert.NotNil(t, persistence, "Expected non-nil when persistence is 
configured")
+       assert.Equal(t, 1, persistence.BookkeeperEnsemble)
+       assert.Equal(t, 1, persistence.BookkeeperWriteQuorum)
+}
diff --git a/pulsaradmin/pkg/admin/topic.go b/pulsaradmin/pkg/admin/topic.go
index 37bbe94a..de195138 100644
--- a/pulsaradmin/pkg/admin/topic.go
+++ b/pulsaradmin/pkg/admin/topic.go
@@ -543,10 +543,12 @@ type Topics interface {
        // RemoveMaxUnackMessagesPerSubscriptionWithContext removes max unacked 
messages policy on subscription for a topic
        RemoveMaxUnackMessagesPerSubscriptionWithContext(context.Context, 
utils.TopicName) error
 
-       // GetPersistence returns the persistence policies for a topic
+       // GetPersistence returns the persistence policies for a topic.
+       // Returns nil if the persistence policy is not configured at the topic 
level.
        GetPersistence(utils.TopicName) (*utils.PersistenceData, error)
 
-       // GetPersistenceWithContext returns the persistence policies for a 
topic
+       // GetPersistenceWithContext returns the persistence policies for a 
topic.
+       // Returns nil if the persistence policy is not configured at the topic 
level.
        GetPersistenceWithContext(context.Context, utils.TopicName) 
(*utils.PersistenceData, error)
 
        // SetPersistence sets the persistence policies for a topic
@@ -561,10 +563,12 @@ type Topics interface {
        // RemovePersistenceWithContext removes the persistence policies for a 
topic
        RemovePersistenceWithContext(context.Context, utils.TopicName) error
 
-       // GetDelayedDelivery returns the delayed delivery policy for a topic
+       // GetDelayedDelivery returns the delayed delivery policy for a topic.
+       // Returns nil if the delayed delivery policy is not configured at the 
topic level.
        GetDelayedDelivery(utils.TopicName) (*utils.DelayedDeliveryData, error)
 
-       // GetDelayedDeliveryWithContext returns the delayed delivery policy 
for a topic
+       // GetDelayedDeliveryWithContext returns the delayed delivery policy 
for a topic.
+       // Returns nil if the delayed delivery policy is not configured at the 
topic level.
        GetDelayedDeliveryWithContext(context.Context, utils.TopicName) 
(*utils.DelayedDeliveryData, error)
 
        // SetDelayedDelivery sets the delayed delivery policy on a topic
@@ -579,10 +583,12 @@ type Topics interface {
        // RemoveDelayedDeliveryWithContext removes the delayed delivery policy 
on a topic
        RemoveDelayedDeliveryWithContext(context.Context, utils.TopicName) error
 
-       // GetDispatchRate returns message dispatch rate for a topic
+       // GetDispatchRate returns message dispatch rate for a topic.
+       // Returns nil if the dispatch rate is not configured at the topic 
level.
        GetDispatchRate(utils.TopicName) (*utils.DispatchRateData, error)
 
-       // GetDispatchRateWithContext returns message dispatch rate for a topic
+       // GetDispatchRateWithContext returns message dispatch rate for a topic.
+       // Returns nil if the dispatch rate is not configured at the topic 
level.
        GetDispatchRateWithContext(context.Context, utils.TopicName) 
(*utils.DispatchRateData, error)
 
        // SetDispatchRate sets message dispatch rate for a topic
@@ -597,10 +603,12 @@ type Topics interface {
        // RemoveDispatchRateWithContext removes message dispatch rate for a 
topic
        RemoveDispatchRateWithContext(context.Context, utils.TopicName) error
 
-       // GetPublishRate returns message publish rate for a topic
+       // GetPublishRate returns message publish rate for a topic.
+       // Returns nil if the publish rate is not configured at the topic level.
        GetPublishRate(utils.TopicName) (*utils.PublishRateData, error)
 
-       // GetPublishRateWithContext returns message publish rate for a topic
+       // GetPublishRateWithContext returns message publish rate for a topic.
+       // Returns nil if the publish rate is not configured at the topic level.
        GetPublishRateWithContext(context.Context, utils.TopicName) 
(*utils.PublishRateData, error)
 
        // SetPublishRate sets message publish rate for a topic
@@ -645,7 +653,8 @@ type Topics interface {
        // RemoveDeduplicationStatusWithContext removes the deduplication 
policy for a topic
        RemoveDeduplicationStatusWithContext(context.Context, utils.TopicName) 
error
 
-       // GetRetention returns the retention configuration for a topic
+       // GetRetention returns the retention configuration for a topic.
+       // Returns nil if the retention policy is not configured at the topic 
level.
        //
        // @param topic
        //        topicName struct
@@ -654,7 +663,8 @@ type Topics interface {
        //        in namespace or broker level, if no policy set in topic level
        GetRetention(topic utils.TopicName, applied bool) 
(*utils.RetentionPolicies, error)
 
-       // GetRetentionWithContext returns the retention configuration for a 
topic
+       // GetRetentionWithContext returns the retention configuration for a 
topic.
+       // Returns nil if the retention policy is not configured at the topic 
level.
        //
        // @param ctx
        //        context used for the request
@@ -821,10 +831,12 @@ type Topics interface {
        //        list of replication cluster id
        SetReplicationClustersWithContext(ctx context.Context, topic 
utils.TopicName, data []string) error
 
-       // GetSubscribeRate returns subscribe rate configuration for a topic
+       // GetSubscribeRate returns subscribe rate configuration for a topic.
+       // Returns nil if the subscribe rate is not configured at the topic 
level.
        GetSubscribeRate(utils.TopicName) (*utils.SubscribeRate, error)
 
-       // GetSubscribeRateWithContext returns subscribe rate configuration for 
a topic
+       // GetSubscribeRateWithContext returns subscribe rate configuration for 
a topic.
+       // Returns nil if the subscribe rate is not configured at the topic 
level.
        GetSubscribeRateWithContext(context.Context, utils.TopicName) 
(*utils.SubscribeRate, error)
 
        // SetSubscribeRate sets subscribe rate configuration for a topic
@@ -839,10 +851,12 @@ type Topics interface {
        // RemoveSubscribeRateWithContext removes subscribe rate configuration 
for a topic
        RemoveSubscribeRateWithContext(context.Context, utils.TopicName) error
 
-       // GetSubscriptionDispatchRate returns subscription dispatch rate for a 
topic
+       // GetSubscriptionDispatchRate returns subscription dispatch rate for a 
topic.
+       // Returns nil if the subscription dispatch rate is not configured at 
the topic level.
        GetSubscriptionDispatchRate(utils.TopicName) (*utils.DispatchRateData, 
error)
 
-       // GetSubscriptionDispatchRateWithContext returns subscription dispatch 
rate for a topic
+       // GetSubscriptionDispatchRateWithContext returns subscription dispatch 
rate for a topic.
+       // Returns nil if the subscription dispatch rate is not configured at 
the topic level.
        GetSubscriptionDispatchRateWithContext(context.Context, 
utils.TopicName) (*utils.DispatchRateData, error)
 
        // SetSubscriptionDispatchRate sets subscription dispatch rate for a 
topic
@@ -948,10 +962,12 @@ type Topics interface {
        // RemoveDeduplicationSnapshotIntervalWithContext removes deduplication 
snapshot interval for a topic
        RemoveDeduplicationSnapshotIntervalWithContext(context.Context, 
utils.TopicName) error
 
-       // GetReplicatorDispatchRate returns replicator dispatch rate for a 
topic
+       // GetReplicatorDispatchRate returns replicator dispatch rate for a 
topic.
+       // Returns nil if the replicator dispatch rate is not configured at the 
topic level.
        GetReplicatorDispatchRate(utils.TopicName) (*utils.DispatchRateData, 
error)
 
-       // GetReplicatorDispatchRateWithContext returns replicator dispatch 
rate for a topic
+       // GetReplicatorDispatchRateWithContext returns replicator dispatch 
rate for a topic.
+       // Returns nil if the replicator dispatch rate is not configured at the 
topic level.
        GetReplicatorDispatchRateWithContext(context.Context, utils.TopicName) 
(*utils.DispatchRateData, error)
 
        // SetReplicatorDispatchRate sets replicator dispatch rate for a topic
@@ -966,10 +982,12 @@ type Topics interface {
        // RemoveReplicatorDispatchRateWithContext removes replicator dispatch 
rate for a topic
        RemoveReplicatorDispatchRateWithContext(context.Context, 
utils.TopicName) error
 
-       // GetOffloadPolicies returns offload policies for a topic
+       // GetOffloadPolicies returns offload policies for a topic.
+       // Returns nil if the offload policies are not configured at the topic 
level.
        GetOffloadPolicies(utils.TopicName) (*utils.OffloadPolicies, error)
 
-       // GetOffloadPoliciesWithContext returns offload policies for a topic
+       // GetOffloadPoliciesWithContext returns offload policies for a topic.
+       // Returns nil if the offload policies are not configured at the topic 
level.
        GetOffloadPoliciesWithContext(context.Context, utils.TopicName) 
(*utils.OffloadPolicies, error)
 
        // SetOffloadPolicies sets offload policies for a topic
@@ -984,10 +1002,12 @@ type Topics interface {
        // RemoveOffloadPoliciesWithContext removes offload policies for a topic
        RemoveOffloadPoliciesWithContext(context.Context, utils.TopicName) error
 
-       // GetAutoSubscriptionCreation returns auto subscription creation 
override for a topic
+       // GetAutoSubscriptionCreation returns auto subscription creation 
override for a topic.
+       // Returns nil if the auto subscription creation override is not 
configured at the topic level.
        GetAutoSubscriptionCreation(utils.TopicName) 
(*utils.AutoSubscriptionCreationOverride, error)
 
-       // GetAutoSubscriptionCreationWithContext returns auto subscription 
creation override for a topic
+       // GetAutoSubscriptionCreationWithContext returns auto subscription 
creation override for a topic.
+       // Returns nil if the auto subscription creation override is not 
configured at the topic level.
        GetAutoSubscriptionCreationWithContext(
                context.Context,
                utils.TopicName,
@@ -1629,8 +1649,11 @@ func (t *topics) GetPersistence(topic utils.TopicName) 
(*utils.PersistenceData,
 func (t *topics) GetPersistenceWithContext(ctx context.Context, topic 
utils.TopicName) (*utils.PersistenceData, error) {
        var persistenceData utils.PersistenceData
        endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), 
"persistence")
-       err := t.pulsar.Client.GetWithContext(ctx, endpoint, &persistenceData)
-       return &persistenceData, err
+       body, err := t.pulsar.Client.GetBodyWithContext(ctx, endpoint, 
&persistenceData)
+       if body != nil {
+               return &persistenceData, err
+       }
+       return nil, err
 }
 
 func (t *topics) SetPersistence(topic utils.TopicName, persistenceData 
utils.PersistenceData) error {
@@ -1665,8 +1688,11 @@ func (t *topics) GetDelayedDeliveryWithContext(
 ) (*utils.DelayedDeliveryData, error) {
        var delayedDeliveryData utils.DelayedDeliveryData
        endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), 
"delayedDelivery")
-       err := t.pulsar.Client.GetWithContext(ctx, endpoint, 
&delayedDeliveryData)
-       return &delayedDeliveryData, err
+       body, err := t.pulsar.Client.GetBodyWithContext(ctx, endpoint, 
&delayedDeliveryData)
+       if body != nil {
+               return &delayedDeliveryData, err
+       }
+       return nil, err
 }
 
 func (t *topics) SetDelayedDelivery(topic utils.TopicName, delayedDeliveryData 
utils.DelayedDeliveryData) error {
@@ -1701,8 +1727,11 @@ func (t *topics) GetDispatchRateWithContext(
 ) (*utils.DispatchRateData, error) {
        var dispatchRateData utils.DispatchRateData
        endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), 
"dispatchRate")
-       err := t.pulsar.Client.GetWithContext(ctx, endpoint, &dispatchRateData)
-       return &dispatchRateData, err
+       body, err := t.pulsar.Client.GetBodyWithContext(ctx, endpoint, 
&dispatchRateData)
+       if body != nil {
+               return &dispatchRateData, err
+       }
+       return nil, err
 }
 
 func (t *topics) SetDispatchRate(topic utils.TopicName, dispatchRateData 
utils.DispatchRateData) error {
@@ -1734,8 +1763,11 @@ func (t *topics) GetPublishRate(topic utils.TopicName) 
(*utils.PublishRateData,
 func (t *topics) GetPublishRateWithContext(ctx context.Context, topic 
utils.TopicName) (*utils.PublishRateData, error) {
        var publishRateData utils.PublishRateData
        endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), 
"publishRate")
-       err := t.pulsar.Client.GetWithContext(ctx, endpoint, &publishRateData)
-       return &publishRateData, err
+       body, err := t.pulsar.Client.GetBodyWithContext(ctx, endpoint, 
&publishRateData)
+       if body != nil {
+               return &publishRateData, err
+       }
+       return nil, err
 }
 
 func (t *topics) SetPublishRate(topic utils.TopicName, publishRateData 
utils.PublishRateData) error {
@@ -1800,10 +1832,13 @@ func (t *topics) GetRetentionWithContext(
 ) (*utils.RetentionPolicies, error) {
        var policy utils.RetentionPolicies
        endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), 
"retention")
-       _, err := t.pulsar.Client.GetWithQueryParamsWithContext(ctx, endpoint, 
&policy, map[string]string{
+       body, err := t.pulsar.Client.GetWithQueryParamsWithContext(ctx, 
endpoint, &policy, map[string]string{
                "applied": strconv.FormatBool(applied),
        }, true)
-       return &policy, err
+       if body != nil {
+               return &policy, err
+       }
+       return nil, err
 }
 
 func (t *topics) RemoveRetention(topic utils.TopicName) error {
@@ -1979,8 +2014,11 @@ func (t *topics) GetSubscribeRate(topic utils.TopicName) 
(*utils.SubscribeRate,
 func (t *topics) GetSubscribeRateWithContext(ctx context.Context, topic 
utils.TopicName) (*utils.SubscribeRate, error) {
        var subscribeRate utils.SubscribeRate
        endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), 
"subscribeRate")
-       err := t.pulsar.Client.GetWithContext(ctx, endpoint, &subscribeRate)
-       return &subscribeRate, err
+       body, err := t.pulsar.Client.GetBodyWithContext(ctx, endpoint, 
&subscribeRate)
+       if body != nil {
+               return &subscribeRate, err
+       }
+       return nil, err
 }
 
 func (t *topics) SetSubscribeRate(topic utils.TopicName, subscribeRate 
utils.SubscribeRate) error {
@@ -2015,8 +2053,11 @@ func (t *topics) GetSubscriptionDispatchRateWithContext(
 ) (*utils.DispatchRateData, error) {
        var dispatchRate utils.DispatchRateData
        endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), 
"subscriptionDispatchRate")
-       err := t.pulsar.Client.GetWithContext(ctx, endpoint, &dispatchRate)
-       return &dispatchRate, err
+       body, err := t.pulsar.Client.GetBodyWithContext(ctx, endpoint, 
&dispatchRate)
+       if body != nil {
+               return &dispatchRate, err
+       }
+       return nil, err
 }
 
 func (t *topics) SetSubscriptionDispatchRate(topic utils.TopicName, 
dispatchRate utils.DispatchRateData) error {
@@ -2212,8 +2253,11 @@ func (t *topics) GetReplicatorDispatchRateWithContext(
 ) (*utils.DispatchRateData, error) {
        var dispatchRate utils.DispatchRateData
        endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), 
"replicatorDispatchRate")
-       err := t.pulsar.Client.GetWithContext(ctx, endpoint, &dispatchRate)
-       return &dispatchRate, err
+       body, err := t.pulsar.Client.GetBodyWithContext(ctx, endpoint, 
&dispatchRate)
+       if body != nil {
+               return &dispatchRate, err
+       }
+       return nil, err
 }
 
 func (t *topics) SetReplicatorDispatchRate(topic utils.TopicName, dispatchRate 
utils.DispatchRateData) error {
@@ -2248,8 +2292,11 @@ func (t *topics) GetAutoSubscriptionCreationWithContext(
 ) (*utils.AutoSubscriptionCreationOverride, error) {
        var autoSubCreation utils.AutoSubscriptionCreationOverride
        endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), 
"autoSubscriptionCreation")
-       err := t.pulsar.Client.GetWithContext(ctx, endpoint, &autoSubCreation)
-       return &autoSubCreation, err
+       body, err := t.pulsar.Client.GetBodyWithContext(ctx, endpoint, 
&autoSubCreation)
+       if body != nil {
+               return &autoSubCreation, err
+       }
+       return nil, err
 }
 
 func (t *topics) SetAutoSubscriptionCreation(topic utils.TopicName,
@@ -2316,8 +2363,11 @@ func (t *topics) GetOffloadPoliciesWithContext(
 ) (*utils.OffloadPolicies, error) {
        var offloadPolicies utils.OffloadPolicies
        endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), 
"offloadPolicies")
-       err := t.pulsar.Client.GetWithContext(ctx, endpoint, &offloadPolicies)
-       return &offloadPolicies, err
+       body, err := t.pulsar.Client.GetBodyWithContext(ctx, endpoint, 
&offloadPolicies)
+       if body != nil {
+               return &offloadPolicies, err
+       }
+       return nil, err
 }
 
 func (t *topics) SetOffloadPolicies(topic utils.TopicName, offloadPolicies 
utils.OffloadPolicies) error {
diff --git a/pulsaradmin/pkg/admin/topic_test.go 
b/pulsaradmin/pkg/admin/topic_test.go
index 8493cf76..f40c0bbc 100644
--- a/pulsaradmin/pkg/admin/topic_test.go
+++ b/pulsaradmin/pkg/admin/topic_test.go
@@ -516,10 +516,10 @@ func TestRetention(t *testing.T) {
        err = admin.Topics().Create(*topicName, 4)
        assert.NoError(t, err)
 
+       // Initial state: policy not configured, should return nil
        topicRetentionPolicy, err := admin.Topics().GetRetention(*topicName, 
false)
        assert.NoError(t, err)
-       assert.Equal(t, int64(0), topicRetentionPolicy.RetentionSizeInMB)
-       assert.Equal(t, 0, topicRetentionPolicy.RetentionTimeInMinutes)
+       assert.Nil(t, topicRetentionPolicy, "Expected nil when retention policy 
is not configured")
        err = admin.Topics().SetRetention(*topicName, utils.RetentionPolicies{
                RetentionSizeInMB:      20480,
                RetentionTimeInMinutes: 1440,
@@ -540,13 +540,12 @@ func TestRetention(t *testing.T) {
        )
        err = admin.Topics().RemoveRetention(*topicName)
        assert.NoError(t, err)
+       // After removal, should return nil (not configured)
        assert.Eventually(
                t,
                func() bool {
                        topicRetentionPolicy, err = 
admin.Topics().GetRetention(*topicName, false)
-                       return err == nil &&
-                               topicRetentionPolicy.RetentionSizeInMB == 
int64(0) &&
-                               topicRetentionPolicy.RetentionTimeInMinutes == 0
+                       return err == nil && topicRetentionPolicy == nil
                },
                10*time.Second,
                100*time.Millisecond,
@@ -565,12 +564,10 @@ func TestSubscribeRate(t *testing.T) {
        err = admin.Topics().Create(*topicName, 4)
        assert.NoError(t, err)
 
-       // Get default subscribe rate (adapt to actual server behavior)
+       // Initial state: policy not configured, should return nil
        initialSubscribeRate, err := admin.Topics().GetSubscribeRate(*topicName)
        assert.NoError(t, err)
-       // Store initial values for later comparison instead of assuming 
specific defaults
-       initialConsumerRate := 
initialSubscribeRate.SubscribeThrottlingRatePerConsumer
-       initialRatePeriod := initialSubscribeRate.RatePeriodInSecond
+       assert.Nil(t, initialSubscribeRate, "Expected nil when subscribe rate 
is not configured")
 
        // Set new subscribe rate
        newSubscribeRate := utils.SubscribeRate{
@@ -594,16 +591,14 @@ func TestSubscribeRate(t *testing.T) {
                100*time.Millisecond,
        )
 
-       // Remove subscribe rate policy
+       // Remove subscribe rate policy - should return nil
        err = admin.Topics().RemoveSubscribeRate(*topicName)
        assert.NoError(t, err)
        assert.Eventually(
                t,
                func() bool {
                        subscribeRate, err := 
admin.Topics().GetSubscribeRate(*topicName)
-                       return err == nil &&
-                               
subscribeRate.SubscribeThrottlingRatePerConsumer == initialConsumerRate &&
-                               subscribeRate.RatePeriodInSecond == 
initialRatePeriod
+                       return err == nil && subscribeRate == nil
                },
                10*time.Second,
                100*time.Millisecond,
@@ -622,14 +617,10 @@ func TestSubscriptionDispatchRate(t *testing.T) {
        err = admin.Topics().Create(*topicName, 4)
        assert.NoError(t, err)
 
-       // Get default subscription dispatch rate (adapt to actual server 
behavior)
+       // Initial state: policy not configured, should return nil
        initialDispatchRate, err := 
admin.Topics().GetSubscriptionDispatchRate(*topicName)
        assert.NoError(t, err)
-       // Store initial values for later comparison instead of assuming 
specific defaults
-       initialMsgRate := initialDispatchRate.DispatchThrottlingRateInMsg
-       initialByteRate := initialDispatchRate.DispatchThrottlingRateInByte
-       initialRatePeriod := initialDispatchRate.RatePeriodInSecond
-       initialRelativeToPublish := initialDispatchRate.RelativeToPublishRate
+       assert.Nil(t, initialDispatchRate, "Expected nil when subscription 
dispatch rate is not configured")
 
        // Set new subscription dispatch rate
        newDispatchRate := utils.DispatchRateData{
@@ -657,18 +648,14 @@ func TestSubscriptionDispatchRate(t *testing.T) {
                100*time.Millisecond,
        )
 
-       // Remove subscription dispatch rate policy
+       // Remove subscription dispatch rate policy - should return nil
        err = admin.Topics().RemoveSubscriptionDispatchRate(*topicName)
        assert.NoError(t, err)
        assert.Eventually(
                t,
                func() bool {
                        dispatchRate, err := 
admin.Topics().GetSubscriptionDispatchRate(*topicName)
-                       return err == nil &&
-                               dispatchRate.DispatchThrottlingRateInMsg == 
initialMsgRate &&
-                               dispatchRate.DispatchThrottlingRateInByte == 
initialByteRate &&
-                               dispatchRate.RatePeriodInSecond == 
initialRatePeriod &&
-                               dispatchRate.RelativeToPublishRate == 
initialRelativeToPublish
+                       return err == nil && dispatchRate == nil
                },
                10*time.Second,
                100*time.Millisecond,
@@ -943,14 +930,10 @@ func TestReplicatorDispatchRate(t *testing.T) {
        err = admin.Topics().Create(*topicName, 4)
        assert.NoError(t, err)
 
-       // Get default replicator dispatch rate (adapt to actual server 
behavior)
+       // Initial state: policy not configured, should return nil
        initialDispatchRate, err := 
admin.Topics().GetReplicatorDispatchRate(*topicName)
        assert.NoError(t, err)
-       // Store initial values for later comparison instead of assuming 
specific defaults
-       initialMsgRate := initialDispatchRate.DispatchThrottlingRateInMsg
-       initialByteRate := initialDispatchRate.DispatchThrottlingRateInByte
-       initialRatePeriod := initialDispatchRate.RatePeriodInSecond
-       initialRelativeToPublish := initialDispatchRate.RelativeToPublishRate
+       assert.Nil(t, initialDispatchRate, "Expected nil when replicator 
dispatch rate is not configured")
 
        // Set new replicator dispatch rate
        newDispatchRate := utils.DispatchRateData{
@@ -978,18 +961,14 @@ func TestReplicatorDispatchRate(t *testing.T) {
                100*time.Millisecond,
        )
 
-       // Remove replicator dispatch rate policy
+       // Remove replicator dispatch rate policy - should return nil
        err = admin.Topics().RemoveReplicatorDispatchRate(*topicName)
        assert.NoError(t, err)
        assert.Eventually(
                t,
                func() bool {
                        dispatchRate, err := 
admin.Topics().GetReplicatorDispatchRate(*topicName)
-                       return err == nil &&
-                               dispatchRate.DispatchThrottlingRateInMsg == 
initialMsgRate &&
-                               dispatchRate.DispatchThrottlingRateInByte == 
initialByteRate &&
-                               dispatchRate.RatePeriodInSecond == 
initialRatePeriod &&
-                               dispatchRate.RelativeToPublishRate == 
initialRelativeToPublish
+                       return err == nil && dispatchRate == nil
                },
                10*time.Second,
                100*time.Millisecond,
@@ -1008,12 +987,10 @@ func TestOffloadPolicies(t *testing.T) {
        err = admin.Topics().Create(*topicName, 4)
        assert.NoError(t, err)
 
-       // Get default offload policies
+       // Initial state: policy not configured, should return nil
        offloadPolicies, err := admin.Topics().GetOffloadPolicies(*topicName)
        assert.NoError(t, err)
-       // Default values should be empty/default
-       assert.Equal(t, "", offloadPolicies.ManagedLedgerOffloadDriver)
-       assert.Equal(t, 0, offloadPolicies.ManagedLedgerOffloadMaxThreads)
+       assert.Nil(t, offloadPolicies, "Expected nil when offload policies are 
not configured")
 
        // Set new offload policies
        newOffloadPolicies := utils.OffloadPolicies{
@@ -1045,16 +1022,14 @@ func TestOffloadPolicies(t *testing.T) {
                100*time.Millisecond,
        )
 
-       // Remove offload policies
+       // Remove offload policies - should return nil
        err = admin.Topics().RemoveOffloadPolicies(*topicName)
        assert.NoError(t, err)
        assert.Eventually(
                t,
                func() bool {
                        offloadPolicies, err = 
admin.Topics().GetOffloadPolicies(*topicName)
-                       return err == nil &&
-                               offloadPolicies.ManagedLedgerOffloadDriver == 
"" &&
-                               offloadPolicies.ManagedLedgerOffloadMaxThreads 
== 0
+                       return err == nil && offloadPolicies == nil
                },
                10*time.Second,
                100*time.Millisecond,
@@ -1073,10 +1048,10 @@ func TestAutoSubscriptionCreation(t *testing.T) {
        err = admin.Topics().Create(*topicName, 4)
        assert.NoError(t, err)
 
-       // Get default auto subscription creation
+       // Initial state: policy not configured, should return nil
        autoSubCreation, err := 
admin.Topics().GetAutoSubscriptionCreation(*topicName)
        assert.NoError(t, err)
-       assert.Equal(t, false, autoSubCreation.AllowAutoSubscriptionCreation)
+       assert.Nil(t, autoSubCreation, "Expected nil when auto subscription 
creation is not configured")
 
        // Set auto subscription creation to true
        newAutoSubCreation := utils.AutoSubscriptionCreationOverride{
@@ -1098,15 +1073,14 @@ func TestAutoSubscriptionCreation(t *testing.T) {
                100*time.Millisecond,
        )
 
-       // Remove auto subscription creation policy
+       // Remove auto subscription creation policy - should return nil
        err = admin.Topics().RemoveAutoSubscriptionCreation(*topicName)
        assert.NoError(t, err)
        assert.Eventually(
                t,
                func() bool {
                        autoSubCreation, err = 
admin.Topics().GetAutoSubscriptionCreation(*topicName)
-                       return err == nil &&
-                               autoSubCreation.AllowAutoSubscriptionCreation 
== false
+                       return err == nil && autoSubCreation == nil
                },
                10*time.Second,
                100*time.Millisecond,
@@ -1369,3 +1343,214 @@ func TestTopics_MaxUnackMessagesPerSubscription(t 
*testing.T) {
        err = admin.Topics().RemoveMaxUnackMessagesPerSubscription(*topicName)
        assert.NoError(t, err)
 }
+
+func TestTopics_Persistence(t *testing.T) {
+       randomName := newTopicName()
+       topic := "persistent://public/default/" + randomName
+       cfg := &config.Config{}
+       admin, err := New(cfg)
+       assert.NoError(t, err)
+       assert.NotNil(t, admin)
+       topicName, err := utils.GetTopicName(topic)
+       assert.NoError(t, err)
+       err = admin.Topics().Create(*topicName, 4)
+       assert.NoError(t, err)
+
+       // Initial state: policy not configured, should return nil
+       persistence, err := admin.Topics().GetPersistence(*topicName)
+       assert.NoError(t, err)
+       assert.Nil(t, persistence, "Expected nil when persistence is not 
configured")
+
+       // Set new persistence policy
+       newPersistence := utils.PersistenceData{
+               BookkeeperEnsemble:             1,
+               BookkeeperWriteQuorum:          1,
+               BookkeeperAckQuorum:            1,
+               ManagedLedgerMaxMarkDeleteRate: 10.0,
+       }
+       err = admin.Topics().SetPersistence(*topicName, newPersistence)
+       assert.NoError(t, err)
+
+       // Verify persistence is set
+       assert.Eventually(
+               t,
+               func() bool {
+                       persistence, err = 
admin.Topics().GetPersistence(*topicName)
+                       return err == nil && persistence != nil &&
+                               persistence.BookkeeperEnsemble == 1 &&
+                               persistence.BookkeeperWriteQuorum == 1
+               },
+               10*time.Second,
+               100*time.Millisecond,
+       )
+
+       // Remove persistence policy - should return nil
+       err = admin.Topics().RemovePersistence(*topicName)
+       assert.NoError(t, err)
+       assert.Eventually(
+               t,
+               func() bool {
+                       persistence, err = 
admin.Topics().GetPersistence(*topicName)
+                       return err == nil && persistence == nil
+               },
+               10*time.Second,
+               100*time.Millisecond,
+       )
+}
+
+func TestTopics_DelayedDelivery(t *testing.T) {
+       randomName := newTopicName()
+       topic := "persistent://public/default/" + randomName
+       cfg := &config.Config{}
+       admin, err := New(cfg)
+       assert.NoError(t, err)
+       assert.NotNil(t, admin)
+       topicName, err := utils.GetTopicName(topic)
+       assert.NoError(t, err)
+       err = admin.Topics().Create(*topicName, 4)
+       assert.NoError(t, err)
+
+       // Initial state: policy not configured, should return nil
+       delayedDelivery, err := admin.Topics().GetDelayedDelivery(*topicName)
+       assert.NoError(t, err)
+       assert.Nil(t, delayedDelivery, "Expected nil when delayed delivery is 
not configured")
+
+       // Set new delayed delivery policy
+       newDelayedDelivery := utils.DelayedDeliveryData{
+               Active:           true,
+               TickTime:         1000,
+               MaxDelayInMillis: 60000,
+       }
+       err = admin.Topics().SetDelayedDelivery(*topicName, newDelayedDelivery)
+       assert.NoError(t, err)
+
+       // Verify delayed delivery is set
+       assert.Eventually(
+               t,
+               func() bool {
+                       delayedDelivery, err = 
admin.Topics().GetDelayedDelivery(*topicName)
+                       return err == nil && delayedDelivery != nil &&
+                               delayedDelivery.Active == true &&
+                               delayedDelivery.TickTime == 1000
+               },
+               10*time.Second,
+               100*time.Millisecond,
+       )
+
+       // Remove delayed delivery policy - should return nil
+       err = admin.Topics().RemoveDelayedDelivery(*topicName)
+       assert.NoError(t, err)
+       assert.Eventually(
+               t,
+               func() bool {
+                       delayedDelivery, err = 
admin.Topics().GetDelayedDelivery(*topicName)
+                       return err == nil && delayedDelivery == nil
+               },
+               10*time.Second,
+               100*time.Millisecond,
+       )
+}
+
+func TestTopics_DispatchRate(t *testing.T) {
+       randomName := newTopicName()
+       topic := "persistent://public/default/" + randomName
+       cfg := &config.Config{}
+       admin, err := New(cfg)
+       assert.NoError(t, err)
+       assert.NotNil(t, admin)
+       topicName, err := utils.GetTopicName(topic)
+       assert.NoError(t, err)
+       err = admin.Topics().Create(*topicName, 4)
+       assert.NoError(t, err)
+
+       // Initial state: policy not configured, should return nil
+       dispatchRate, err := admin.Topics().GetDispatchRate(*topicName)
+       assert.NoError(t, err)
+       assert.Nil(t, dispatchRate, "Expected nil when dispatch rate is not 
configured")
+
+       // Set new dispatch rate
+       newDispatchRate := utils.DispatchRateData{
+               DispatchThrottlingRateInMsg:  100,
+               DispatchThrottlingRateInByte: 1048576,
+               RatePeriodInSecond:           60,
+               RelativeToPublishRate:        false,
+       }
+       err = admin.Topics().SetDispatchRate(*topicName, newDispatchRate)
+       assert.NoError(t, err)
+
+       // Verify dispatch rate is set
+       assert.Eventually(
+               t,
+               func() bool {
+                       dispatchRate, err = 
admin.Topics().GetDispatchRate(*topicName)
+                       return err == nil && dispatchRate != nil &&
+                               dispatchRate.DispatchThrottlingRateInMsg == 100
+               },
+               10*time.Second,
+               100*time.Millisecond,
+       )
+
+       // Remove dispatch rate policy - should return nil
+       err = admin.Topics().RemoveDispatchRate(*topicName)
+       assert.NoError(t, err)
+       assert.Eventually(
+               t,
+               func() bool {
+                       dispatchRate, err = 
admin.Topics().GetDispatchRate(*topicName)
+                       return err == nil && dispatchRate == nil
+               },
+               10*time.Second,
+               100*time.Millisecond,
+       )
+}
+
+func TestTopics_PublishRate(t *testing.T) {
+       randomName := newTopicName()
+       topic := "persistent://public/default/" + randomName
+       cfg := &config.Config{}
+       admin, err := New(cfg)
+       assert.NoError(t, err)
+       assert.NotNil(t, admin)
+       topicName, err := utils.GetTopicName(topic)
+       assert.NoError(t, err)
+       err = admin.Topics().Create(*topicName, 4)
+       assert.NoError(t, err)
+
+       // Initial state: policy not configured, should return nil
+       publishRate, err := admin.Topics().GetPublishRate(*topicName)
+       assert.NoError(t, err)
+       assert.Nil(t, publishRate, "Expected nil when publish rate is not 
configured")
+
+       // Set new publish rate
+       newPublishRate := utils.PublishRateData{
+               PublishThrottlingRateInMsg:  200,
+               PublishThrottlingRateInByte: 2097152,
+       }
+       err = admin.Topics().SetPublishRate(*topicName, newPublishRate)
+       assert.NoError(t, err)
+
+       // Verify publish rate is set
+       assert.Eventually(
+               t,
+               func() bool {
+                       publishRate, err = 
admin.Topics().GetPublishRate(*topicName)
+                       return err == nil && publishRate != nil &&
+                               publishRate.PublishThrottlingRateInMsg == 200
+               },
+               10*time.Second,
+               100*time.Millisecond,
+       )
+
+       // Remove publish rate policy - should return nil
+       err = admin.Topics().RemovePublishRate(*topicName)
+       assert.NoError(t, err)
+       assert.Eventually(
+               t,
+               func() bool {
+                       publishRate, err = 
admin.Topics().GetPublishRate(*topicName)
+                       return err == nil && publishRate == nil
+               },
+               10*time.Second,
+               100*time.Millisecond,
+       )
+}
diff --git a/pulsaradmin/pkg/rest/client.go b/pulsaradmin/pkg/rest/client.go
index ffb0c29a..82e460b1 100644
--- a/pulsaradmin/pkg/rest/client.go
+++ b/pulsaradmin/pkg/rest/client.go
@@ -141,6 +141,10 @@ func (c *Client) GetWithContext(ctx context.Context, 
endpoint string, obj interf
        return err
 }
 
+func (c *Client) GetBodyWithContext(ctx context.Context, endpoint string, obj 
interface{}) ([]byte, error) {
+       return c.GetWithQueryParamsWithContext(ctx, endpoint, obj, nil, true)
+}
+
 func (c *Client) GetWithQueryParams(endpoint string, obj interface{}, params 
map[string]string,
        decode bool) ([]byte, error) {
        return c.GetWithQueryParamsWithContext(context.Background(), endpoint, 
obj, params, decode)
@@ -190,12 +194,14 @@ func (c *Client) GetWithOptionsWithContext(
        defer safeRespClose(resp)
 
        if obj != nil {
-               if err := decodeJSONBody(resp, &obj); err != nil {
+               body, err := decodeJSONWithBody(resp, &obj)
+               if err != nil {
                        if err == io.EOF {
                                return nil, nil
                        }
                        return nil, err
                }
+               return body, nil
        } else if !decode {
                if file != nil {
                        _, err := io.Copy(file, resp.Body)
@@ -533,6 +539,25 @@ func decodeJSONBody(resp *http.Response, out interface{}) 
error {
        return dec.Decode(out)
 }
 
+// decodeJSONWithBody is used to JSON decode a body AND ALSO return the raw 
body bytes
+func decodeJSONWithBody(resp *http.Response, out interface{}) ([]byte, error) {
+       // Read the body first so we can return it even after decoding
+       body, err := io.ReadAll(resp.Body)
+       if err != nil {
+               return nil, err
+       }
+
+       if len(body) == 0 {
+               return nil, nil
+       }
+
+       if err := json.Unmarshal(body, &out); err != nil {
+               return nil, err
+       }
+
+       return body, nil
+}
+
 // safeRespClose is used to close a response body
 func safeRespClose(resp *http.Response) {
        if resp != nil {


Reply via email to