This is an automated email from the ASF dual-hosted git repository.
gehafearless pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new 2bcdaf51b feat(go-client): add ListNodes as admin API for meta (#1939)
2bcdaf51b is described below
commit 2bcdaf51bd33f431ce905e61bd8cab9a2185c29b
Author: Dan Wang <[email protected]>
AuthorDate: Mon Mar 11 11:08:36 2024 +0800
feat(go-client): add ListNodes as admin API for meta (#1939)
---
go-client/admin/client.go | 30 +++++++++++++++++++++
go-client/admin/client_test.go | 55 ++++++++++++++++++++++++++++++++++-----
go-client/idl/base/rpc_address.go | 6 ++---
3 files changed, 82 insertions(+), 9 deletions(-)
diff --git a/go-client/admin/client.go b/go-client/admin/client.go
index 77851f477..d51a0b52a 100644
--- a/go-client/admin/client.go
+++ b/go-client/admin/client.go
@@ -54,6 +54,10 @@ type Client interface {
// Empty `args` means "list all available tables"; Otherwise, the only
parameter would
// specify the status of the returned tables.
ListTables(args ...interface{}) ([]*replication.AppInfo, error)
+
+ // Empty `args` means "list all alive nodes"; Otherwise, the only
parameter would
+ // specify the status of the returned nodes.
+ ListNodes(args ...interface{}) ([]*admin.NodeInfo, error)
}
type Config struct {
@@ -234,3 +238,29 @@ func (c *rpcBasedClient) ListTables(args ...interface{})
([]*replication.AppInfo
}
return c.listTables(args[0].(replication.AppStatus))
}
+
+func (c *rpcBasedClient) listNodes(status admin.NodeStatus)
([]*admin.NodeInfo, error) {
+ req := &admin.ConfigurationListNodesRequest{
+ Status: status,
+ }
+
+ var nodes []*admin.NodeInfo
+ var respErr error
+ err := c.callMeta("ListNodes", req, func(iresp interface{}) {
+ resp := iresp.(*admin.ConfigurationListNodesResponse)
+ nodes = resp.Infos
+ respErr = base.GetResponseError(resp)
+ })
+ if err != nil {
+ return nodes, err
+ }
+
+ return nodes, respErr
+}
+
+func (c *rpcBasedClient) ListNodes(args ...interface{}) ([]*admin.NodeInfo,
error) {
+ if len(args) == 0 {
+ return c.listNodes(admin.NodeStatus_NS_ALIVE)
+ }
+ return c.listNodes(args[0].(admin.NodeStatus))
+}
diff --git a/go-client/admin/client_test.go b/go-client/admin/client_test.go
index 8fdd13a4f..6d8b12ade 100644
--- a/go-client/admin/client_test.go
+++ b/go-client/admin/client_test.go
@@ -25,6 +25,7 @@ import (
"testing"
"time"
+ "github.com/apache/incubator-pegasus/go-client/idl/admin"
"github.com/apache/incubator-pegasus/go-client/idl/replication"
"github.com/apache/incubator-pegasus/go-client/pegasus"
"github.com/stretchr/testify/assert"
@@ -43,6 +44,22 @@ func defaultConfig() Config {
}
}
+func defaultReplicaServerPorts() []int {
+ return []int{34801, 34802, 34803}
+}
+
+func timeoutConfig() Config {
+ return Config{
+ MetaServers: []string{"0.0.0.0:123456"},
+ Timeout: 500 * time.Millisecond,
+ }
+}
+
+func testAdmin_Timeout(t *testing.T, exec func(c Client) error) {
+ c := NewClient(timeoutConfig())
+ assert.Equal(t, context.DeadlineExceeded, exec(c))
+}
+
func TestAdmin_Table(t *testing.T) {
c := NewClient(defaultConfig())
@@ -75,13 +92,10 @@ func TestAdmin_Table(t *testing.T) {
}
func TestAdmin_ListTablesTimeout(t *testing.T) {
- c := NewClient(Config{
- MetaServers: []string{"0.0.0.0:123456"},
- Timeout: 500 * time.Millisecond,
+ testAdmin_Timeout(t, func(c Client) error {
+ _, err := c.ListTables()
+ return err
})
-
- _, err := c.ListTables()
- assert.Equal(t, err, context.DeadlineExceeded)
}
// Ensures after the call `CreateTable` ends, the table must be right
available to access.
@@ -145,3 +159,32 @@ func TestAdmin_GetAppEnvs(t *testing.T) {
assert.Empty(t, tb.Envs)
}
}
+
+func TestAdmin_ListNodes(t *testing.T) {
+ c := NewClient(defaultConfig())
+
+ nodes, err := c.ListNodes()
+ assert.Nil(t, err)
+
+ expectedReplicaServerPorts := defaultReplicaServerPorts()
+
+ // Compare slice length.
+ assert.Equal(t, len(expectedReplicaServerPorts), len(nodes))
+
+ actualReplicaServerPorts := make([]int, len(nodes))
+ for i, node := range nodes {
+ // Each node should be alive.
+ assert.Equal(t, admin.NodeStatus_NS_ALIVE, node.Status)
+ actualReplicaServerPorts[i] = node.Address.GetPort()
+ }
+
+ // Match elements without extra ordering.
+ assert.ElementsMatch(t, expectedReplicaServerPorts,
actualReplicaServerPorts)
+}
+
+func TestAdmin_ListNodesTimeout(t *testing.T) {
+ testAdmin_Timeout(t, func(c Client) error {
+ _, err := c.ListNodes()
+ return err
+ })
+}
diff --git a/go-client/idl/base/rpc_address.go
b/go-client/idl/base/rpc_address.go
index 9121cd5cd..d451c4228 100644
--- a/go-client/idl/base/rpc_address.go
+++ b/go-client/idl/base/rpc_address.go
@@ -57,16 +57,16 @@ func (r *RPCAddress) String() string {
return fmt.Sprintf("RPCAddress(%s)", r.GetAddress())
}
-func (r *RPCAddress) getIp() net.IP {
+func (r *RPCAddress) GetIP() net.IP {
return net.IPv4(byte(0xff&(r.address>>56)), byte(0xff&(r.address>>48)),
byte(0xff&(r.address>>40)), byte(0xff&(r.address>>32)))
}
-func (r *RPCAddress) getPort() int {
+func (r *RPCAddress) GetPort() int {
return int(0xffff & (r.address >> 16))
}
func (r *RPCAddress) GetAddress() string {
- return fmt.Sprintf("%s:%d", r.getIp(), r.getPort())
+ return fmt.Sprintf("%s:%d", r.GetIP(), r.GetPort())
}
func (r *RPCAddress) GetRawAddress() int64 {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]