This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 5cdb227c Support update or remove topic properties (#1381)
5cdb227c is described below

commit 5cdb227c6fdf5dc4513348241e4249b1812bad3b
Author: Yunze Xu <[email protected]>
AuthorDate: Mon Jun 9 19:49:34 2025 +0800

    Support update or remove topic properties (#1381)
---
 pulsaradmin/pkg/admin/topic.go      | 16 +++++++++++++
 pulsaradmin/pkg/admin/topic_test.go | 46 +++++++++++++++++++++++++++++--------
 2 files changed, 52 insertions(+), 10 deletions(-)

diff --git a/pulsaradmin/pkg/admin/topic.go b/pulsaradmin/pkg/admin/topic.go
index 82cfc87e..c7fe2225 100644
--- a/pulsaradmin/pkg/admin/topic.go
+++ b/pulsaradmin/pkg/admin/topic.go
@@ -51,6 +51,12 @@ type Topics interface {
        // GetProperties returns the properties of a topic
        GetProperties(topic utils.TopicName) (map[string]string, error)
 
+       // UpdateProperties updates the properties of a topic
+       UpdateProperties(topic utils.TopicName, properties map[string]string) 
error
+
+       // RemoveProperty removes a property with the given key of a topic
+       RemoveProperty(topic utils.TopicName, key string) error
+
        // Delete a topic, this function can delete both partitioned or 
non-partitioned topic
        //
        // @param topic
@@ -433,6 +439,16 @@ func (t *topics) GetProperties(topic utils.TopicName) 
(map[string]string, error)
        return properties, err
 }
 
+func (t *topics) UpdateProperties(topic utils.TopicName, properties 
map[string]string) error {
+       endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), 
"properties")
+       return t.pulsar.Client.Put(endpoint, properties)
+}
+
+func (t *topics) RemoveProperty(topic utils.TopicName, key string) error {
+       endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), 
"properties")
+       return t.pulsar.Client.DeleteWithQueryParams(endpoint, 
map[string]string{"key": key})
+}
+
 func (t *topics) Delete(topic utils.TopicName, force bool, nonPartitioned 
bool) error {
        endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), 
"partitions")
        if nonPartitioned {
diff --git a/pulsaradmin/pkg/admin/topic_test.go 
b/pulsaradmin/pkg/admin/topic_test.go
index 734cce1c..1e80f789 100644
--- a/pulsaradmin/pkg/admin/topic_test.go
+++ b/pulsaradmin/pkg/admin/topic_test.go
@@ -65,37 +65,63 @@ func TestCreateTopic(t *testing.T) {
        t.Error("Couldn't find topic: " + topic)
 }
 
-func TestTopics_CreateWithProperties(t *testing.T) {
+func TestTopics_Properties(t *testing.T) {
+       t.Run("NonPartitioned", func(t *testing.T) {
+               internalTestTopicsProperties(t, 0)
+       })
+       t.Run("Partitioned", func(t *testing.T) {
+               internalTestTopicsProperties(t, 4)
+       })
+}
+
+func verifyTopicProperties(t *testing.T, admin Client, topic *utils.TopicName,
+       expected map[string]string) {
+       properties, err := admin.Topics().GetProperties(*topic)
+       assert.NoError(t, err)
+       assert.Equal(t, expected, properties)
+}
+
+func internalTestTopicsProperties(t *testing.T, partitions int) {
        topic := newTopicName()
        cfg := &config.Config{}
        admin, err := New(cfg)
        assert.NoError(t, err)
        assert.NotNil(t, admin)
 
-       // Create non-partition topic
        topicName, err := utils.GetTopicName(topic)
        assert.NoError(t, err)
-       err = admin.Topics().CreateWithProperties(*topicName, 0, 
map[string]string{
+       err = admin.Topics().CreateWithProperties(*topicName, partitions, 
map[string]string{
                "key1": "value1",
        })
        assert.NoError(t, err)
+       verifyTopicProperties(t, admin, topicName, map[string]string{"key1": 
"value1"})
 
        properties, err := admin.Topics().GetProperties(*topicName)
        assert.NoError(t, err)
        assert.Equal(t, properties["key1"], "value1")
 
-       // Create partition topic
-       topic = newTopicName()
-       topicName, err = utils.GetTopicName(topic)
+       newProperties := map[string]string{
+               "key1": "value1-updated",
+               "key2": "value2",
+       }
+       err = admin.Topics().UpdateProperties(*topicName, newProperties)
+       assert.NoError(t, err)
+       verifyTopicProperties(t, admin, topicName, newProperties)
+
+       err = admin.Topics().UpdateProperties(*topicName, 
map[string]string{"key3": "value3"})
        assert.NoError(t, err)
-       err = admin.Topics().CreateWithProperties(*topicName, 4, 
map[string]string{
+       verifyTopicProperties(t, admin, topicName, map[string]string{
+               "key1": "value1-updated",
                "key2": "value2",
+               "key3": "value3",
        })
-       assert.NoError(t, err)
 
-       properties, err = admin.Topics().GetProperties(*topicName)
+       err = admin.Topics().RemoveProperty(*topicName, "key1")
        assert.NoError(t, err)
-       assert.Equal(t, properties["key2"], "value2")
+       verifyTopicProperties(t, admin, topicName, map[string]string{
+               "key2": "value2",
+               "key3": "value3",
+       })
 }
 
 func TestPartitionState(t *testing.T) {

Reply via email to