This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 5cdb227c Support update or remove topic properties (#1381)
5cdb227c is described below
commit 5cdb227c6fdf5dc4513348241e4249b1812bad3b
Author: Yunze Xu <[email protected]>
AuthorDate: Mon Jun 9 19:49:34 2025 +0800
Support update or remove topic properties (#1381)
---
pulsaradmin/pkg/admin/topic.go | 16 +++++++++++++
pulsaradmin/pkg/admin/topic_test.go | 46 +++++++++++++++++++++++++++++--------
2 files changed, 52 insertions(+), 10 deletions(-)
diff --git a/pulsaradmin/pkg/admin/topic.go b/pulsaradmin/pkg/admin/topic.go
index 82cfc87e..c7fe2225 100644
--- a/pulsaradmin/pkg/admin/topic.go
+++ b/pulsaradmin/pkg/admin/topic.go
@@ -51,6 +51,12 @@ type Topics interface {
// GetProperties returns the properties of a topic
GetProperties(topic utils.TopicName) (map[string]string, error)
+ // UpdateProperties updates the properties of a topic
+ UpdateProperties(topic utils.TopicName, properties map[string]string)
error
+
+ // RemoveProperty removes a property with the given key of a topic
+ RemoveProperty(topic utils.TopicName, key string) error
+
// Delete a topic, this function can delete both partitioned or
non-partitioned topic
//
// @param topic
@@ -433,6 +439,16 @@ func (t *topics) GetProperties(topic utils.TopicName)
(map[string]string, error)
return properties, err
}
+func (t *topics) UpdateProperties(topic utils.TopicName, properties
map[string]string) error {
+ endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(),
"properties")
+ return t.pulsar.Client.Put(endpoint, properties)
+}
+
+func (t *topics) RemoveProperty(topic utils.TopicName, key string) error {
+ endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(),
"properties")
+ return t.pulsar.Client.DeleteWithQueryParams(endpoint,
map[string]string{"key": key})
+}
+
func (t *topics) Delete(topic utils.TopicName, force bool, nonPartitioned
bool) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(),
"partitions")
if nonPartitioned {
diff --git a/pulsaradmin/pkg/admin/topic_test.go
b/pulsaradmin/pkg/admin/topic_test.go
index 734cce1c..1e80f789 100644
--- a/pulsaradmin/pkg/admin/topic_test.go
+++ b/pulsaradmin/pkg/admin/topic_test.go
@@ -65,37 +65,63 @@ func TestCreateTopic(t *testing.T) {
t.Error("Couldn't find topic: " + topic)
}
-func TestTopics_CreateWithProperties(t *testing.T) {
+func TestTopics_Properties(t *testing.T) {
+ t.Run("NonPartitioned", func(t *testing.T) {
+ internalTestTopicsProperties(t, 0)
+ })
+ t.Run("Partitioned", func(t *testing.T) {
+ internalTestTopicsProperties(t, 4)
+ })
+}
+
+func verifyTopicProperties(t *testing.T, admin Client, topic *utils.TopicName,
+ expected map[string]string) {
+ properties, err := admin.Topics().GetProperties(*topic)
+ assert.NoError(t, err)
+ assert.Equal(t, expected, properties)
+}
+
+func internalTestTopicsProperties(t *testing.T, partitions int) {
topic := newTopicName()
cfg := &config.Config{}
admin, err := New(cfg)
assert.NoError(t, err)
assert.NotNil(t, admin)
- // Create non-partition topic
topicName, err := utils.GetTopicName(topic)
assert.NoError(t, err)
- err = admin.Topics().CreateWithProperties(*topicName, 0,
map[string]string{
+ err = admin.Topics().CreateWithProperties(*topicName, partitions,
map[string]string{
"key1": "value1",
})
assert.NoError(t, err)
+ verifyTopicProperties(t, admin, topicName, map[string]string{"key1":
"value1"})
properties, err := admin.Topics().GetProperties(*topicName)
assert.NoError(t, err)
assert.Equal(t, properties["key1"], "value1")
- // Create partition topic
- topic = newTopicName()
- topicName, err = utils.GetTopicName(topic)
+ newProperties := map[string]string{
+ "key1": "value1-updated",
+ "key2": "value2",
+ }
+ err = admin.Topics().UpdateProperties(*topicName, newProperties)
+ assert.NoError(t, err)
+ verifyTopicProperties(t, admin, topicName, newProperties)
+
+ err = admin.Topics().UpdateProperties(*topicName,
map[string]string{"key3": "value3"})
assert.NoError(t, err)
- err = admin.Topics().CreateWithProperties(*topicName, 4,
map[string]string{
+ verifyTopicProperties(t, admin, topicName, map[string]string{
+ "key1": "value1-updated",
"key2": "value2",
+ "key3": "value3",
})
- assert.NoError(t, err)
- properties, err = admin.Topics().GetProperties(*topicName)
+ err = admin.Topics().RemoveProperty(*topicName, "key1")
assert.NoError(t, err)
- assert.Equal(t, properties["key2"], "value2")
+ verifyTopicProperties(t, admin, topicName, map[string]string{
+ "key2": "value2",
+ "key3": "value3",
+ })
}
func TestPartitionState(t *testing.T) {