This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new ab6584b [ISSUE #1205]Exposing cluster from admin (#1221)
ab6584b is described below
commit ab6584b3b3f95e050d4ef99c84ab3c3c9e80d059
Author: takagi <[email protected]>
AuthorDate: Tue Sep 9 19:20:10 2025 +0800
[ISSUE #1205]Exposing cluster from admin (#1221)
Co-authored-by: weilin <[email protected]>
---
admin/admin.go | 5 +++++
internal/mock_namesrv.go | 15 +++++++++++++++
internal/namesrv.go | 2 ++
internal/route.go | 16 ++++++++++++++++
4 files changed, 38 insertions(+)
diff --git a/admin/admin.go b/admin/admin.go
index a92a25c..04a08a5 100644
--- a/admin/admin.go
+++ b/admin/admin.go
@@ -38,6 +38,7 @@ type Admin interface {
FetchAllTopicList(ctx context.Context) (*TopicList, error)
//GetBrokerClusterInfo(ctx context.Context) (*remote.RemotingCommand,
error)
FetchPublishMessageQueues(ctx context.Context, topic string)
([]*primitive.MessageQueue, error)
+ FetchClusterList(topic string) ([]string, error)
Close() error
}
@@ -276,6 +277,10 @@ func (a *admin) FetchPublishMessageQueues(ctx
context.Context, topic string) ([]
return
a.cli.GetNameSrv().FetchPublishMessageQueues(utils.WrapNamespace(a.opts.Namespace,
topic))
}
+func (a *admin) FetchClusterList(topic string) ([]string, error) {
+ return a.cli.GetNameSrv().FetchClusterList(topic)
+}
+
func (a *admin) Close() error {
a.closeOnce.Do(func() {
a.cli.Shutdown()
diff --git a/internal/mock_namesrv.go b/internal/mock_namesrv.go
index 7ce6f97..abfbf3e 100644
--- a/internal/mock_namesrv.go
+++ b/internal/mock_namesrv.go
@@ -191,3 +191,18 @@ func (mr *MockNamesrvsMockRecorder) AddrList()
*gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddrList",
reflect.TypeOf((*MockNamesrvs)(nil).AddrList))
}
+
+// FetchClusterList mocks base method
+func (m *MockNamesrvs) FetchClusterList(topic string) ([]string, error) {
+ m.ctrl.T.Helper()
+ ret := m.ctrl.Call(m, "FetchClusterList", topic)
+ ret0, _ := ret[0].([]string)
+ ret1, _ := ret[1].(error)
+ return ret0, ret1
+}
+
+// FetchClusterList indicates an expected call of FetchClusterList
+func (mr *MockNamesrvsMockRecorder) FetchClusterList(topic interface{})
*gomock.Call {
+ mr.mock.ctrl.T.Helper()
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock,
"FetchClusterList", reflect.TypeOf((*MockNamesrvs)(nil).FetchClusterList),
topic)
+}
diff --git a/internal/namesrv.go b/internal/namesrv.go
index c347998..ac7643d 100644
--- a/internal/namesrv.go
+++ b/internal/namesrv.go
@@ -62,6 +62,8 @@ type Namesrvs interface {
FetchSubscribeMessageQueues(topic string) ([]*primitive.MessageQueue,
error)
+ FetchClusterList(topic string) ([]string, error)
+
AddrList() []string
}
diff --git a/internal/route.go b/internal/route.go
index 56f764f..4af8619 100644
--- a/internal/route.go
+++ b/internal/route.go
@@ -530,6 +530,22 @@ func (s *namesrvs) routeData2PublishInfo(topic string,
data *TopicRouteData) *To
return publishInfo
}
+func (s *namesrvs) FetchClusterList(topic string) ([]string, error) {
+ routeData, err := s.queryTopicRouteInfoFromServer(topic)
+ if err != nil {
+ return nil, err
+ }
+ clusterSet := make(map[string]struct{})
+ for _, bd := range routeData.BrokerDataList {
+ clusterSet[bd.Cluster] = struct{}{}
+ }
+ clusterList := make([]string, 0, len(clusterSet))
+ for cluster := range clusterSet {
+ clusterList = append(clusterList, cluster)
+ }
+ return clusterList, nil
+}
+
// TopicRouteData TopicRouteData
type TopicRouteData struct {
OrderTopicConf string