This is an automated email from the ASF dual-hosted git repository.
laiyingchun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new 24ee71c28 refactor: support close admin-cli client (#2162)
24ee71c28 is described below
commit 24ee71c281ebdbb72202a60e75e5723eddafa1be
Author: Pengfan Lu <[email protected]>
AuthorDate: Fri Jan 3 09:29:48 2025 +0800
refactor: support close admin-cli client (#2162)
Support to close go-client, disconnect tcp connects to replica server.
---
admin-cli/cmd/init.go | 2 +-
admin-cli/executor/client.go | 32 ++++++++++++++++++++--
admin-cli/executor/disk_info.go | 6 ++--
.../executor/toolkits/tablemigrator/switcher.go | 3 +-
admin-cli/util/pegasus_node.go | 21 ++++++++++++++
5 files changed, 56 insertions(+), 8 deletions(-)
diff --git a/admin-cli/cmd/init.go b/admin-cli/cmd/init.go
index f910d67eb..50fcb2839 100644
--- a/admin-cli/cmd/init.go
+++ b/admin-cli/cmd/init.go
@@ -34,5 +34,5 @@ var pegasusClient *executor.Client
func Init(metaList []string) {
globalMetaList = metaList
- pegasusClient = executor.NewClient(os.Stdout, globalMetaList)
+ pegasusClient, _ = executor.NewClient(os.Stdout, globalMetaList, true)
}
diff --git a/admin-cli/executor/client.go b/admin-cli/executor/client.go
index b41c7189f..62998177a 100644
--- a/admin-cli/executor/client.go
+++ b/admin-cli/executor/client.go
@@ -23,6 +23,7 @@ import (
"fmt"
"io"
"os"
+ "strings"
"github.com/apache/incubator-pegasus/admin-cli/client"
"github.com/apache/incubator-pegasus/admin-cli/util"
@@ -45,14 +46,18 @@ type Client struct {
}
// NewClient creates a client for accessing Pegasus cluster for use of
admin-cli.
-func NewClient(writer io.Writer, metaAddrs []string) *Client {
+// When listing nodes fails, willExit == true means call os.Exit().
+func NewClient(writer io.Writer, metaAddrs []string, willExit bool) (*Client,
error) {
meta := client.NewRPCBasedMeta(metaAddrs)
// TODO(wutao): initialize replica-nodes lazily
nodes, err := meta.ListNodes()
if err != nil {
- fmt.Fprintf(writer, "fatal: failed to list nodes [%s]\n", err)
- os.Exit(1)
+ fmt.Printf("Error: failed to list nodes [%s]\n", err)
+ if willExit {
+ os.Exit(1)
+ }
+ return nil, fmt.Errorf("failed to list nodes [%s]", err)
}
var replicaAddrs []string
@@ -65,5 +70,26 @@ func NewClient(writer io.Writer, metaAddrs []string) *Client
{
Meta: meta,
Nodes: util.NewPegasusNodeManager(metaAddrs, replicaAddrs),
Perf: aggregate.NewPerfClient(metaAddrs),
+ }, nil
+}
+
+func CloseClient(client *Client) error {
+ var errorStrings []string
+ err := client.Meta.Close()
+ if err != nil {
+ fmt.Printf("Error: failed to close meta session [%s].\n", err)
+ errorStrings = append(errorStrings, err.Error())
+ }
+
+ client.Perf.Close()
+
+ err = client.Nodes.CloseAllNodes()
+ if err != nil {
+ fmt.Printf("Error: failed to close nodes session [%s].\n", err)
+ errorStrings = append(errorStrings, err.Error())
+ }
+ if len(errorStrings) != 0 {
+ return fmt.Errorf("%s", strings.Join(errorStrings, "\n"))
}
+ return nil
}
diff --git a/admin-cli/executor/disk_info.go b/admin-cli/executor/disk_info.go
index a7793bb4d..83c49511d 100644
--- a/admin-cli/executor/disk_info.go
+++ b/admin-cli/executor/disk_info.go
@@ -45,7 +45,7 @@ func QueryDiskInfo(client *Client, infoType DiskInfoType,
replicaServer string,
}
func GetDiskInfo(client *Client, infoType DiskInfoType, replicaServer string,
tableName string, diskTag string, print bool) ([]interface{}, error) {
- resp, err := sendQueryDiskInfoRequest(client, replicaServer, tableName)
+ resp, err := SendQueryDiskInfoRequest(client, replicaServer, tableName)
if err != nil {
return nil, err
}
@@ -60,7 +60,7 @@ func GetDiskInfo(client *Client, infoType DiskInfoType,
replicaServer string, ta
}
}
-func sendQueryDiskInfoRequest(client *Client, replicaServer string, tableName
string) (*radmin.QueryDiskInfoResponse, error) {
+func SendQueryDiskInfoRequest(client *Client, replicaServer string, tableName
string) (*radmin.QueryDiskInfoResponse, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
@@ -88,7 +88,7 @@ func QueryAllNodesDiskInfo(client *Client, tableName string)
(map[string]*radmin
}
for _, nodeInfo := range nodeInfos {
address := nodeInfo.GetAddress().GetAddress()
- resp, err := sendQueryDiskInfoRequest(client, address,
tableName)
+ resp, err := SendQueryDiskInfoRequest(client, address,
tableName)
if err != nil {
return respMap, err
}
diff --git a/admin-cli/executor/toolkits/tablemigrator/switcher.go
b/admin-cli/executor/toolkits/tablemigrator/switcher.go
index 76c223a98..41f2be6d0 100644
--- a/admin-cli/executor/toolkits/tablemigrator/switcher.go
+++ b/admin-cli/executor/toolkits/tablemigrator/switcher.go
@@ -59,7 +59,8 @@ func SwitchMetaAddrs(client *executor.Client, zkAddr string,
zkRoot string, tabl
originMeta := client.Meta
targetAddrList := strings.Split(targetAddrs, ",")
- targetMeta := executor.NewClient(os.Stdout, targetAddrList).Meta
+ pegasusClient, _ := executor.NewClient(os.Stdout, targetAddrList, true)
+ targetMeta := pegasusClient.Meta
env := map[string]string{
"replica.deny_client_request": "reconfig*all",
}
diff --git a/admin-cli/util/pegasus_node.go b/admin-cli/util/pegasus_node.go
index 5f7320820..e0554008f 100644
--- a/admin-cli/util/pegasus_node.go
+++ b/admin-cli/util/pegasus_node.go
@@ -84,6 +84,13 @@ func (n *PegasusNode) RPCAddress() *base.RPCAddress {
return base.NewRPCAddress(n.IP, n.Port)
}
+func (n *PegasusNode) Close() error {
+ if n.session != nil {
+ return n.session.Close()
+ }
+ return nil
+}
+
// NewNodeFromTCPAddr creates a node from tcp address.
// NOTE:
// - Will not initialize TCP connection unless needed.
@@ -211,3 +218,17 @@ func (m *PegasusNodeManager) GetPerfSession(addr string,
ntype session.NodeType)
return aggregate.WrapPerf(addr, node.session)
}
+
+func (m *PegasusNodeManager) CloseAllNodes() error {
+ var errorStrings []string
+ for _, n := range m.nodes {
+ err := n.Close()
+ if err != nil {
+ errorStrings = append(errorStrings, err.Error())
+ }
+ }
+ if len(errorStrings) != 0 {
+ return fmt.Errorf("%s", strings.Join(errorStrings, "\n"))
+ }
+ return nil
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]