This is an automated email from the ASF dual-hosted git repository.

laiyingchun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new 24ee71c28 refactor: support close admin-cli client (#2162)
24ee71c28 is described below

commit 24ee71c281ebdbb72202a60e75e5723eddafa1be
Author: Pengfan Lu <[email protected]>
AuthorDate: Fri Jan 3 09:29:48 2025 +0800

    refactor: support close admin-cli client (#2162)
    
    Support to close go-client, disconnect tcp connects to replica server.
---
 admin-cli/cmd/init.go                              |  2 +-
 admin-cli/executor/client.go                       | 32 ++++++++++++++++++++--
 admin-cli/executor/disk_info.go                    |  6 ++--
 .../executor/toolkits/tablemigrator/switcher.go    |  3 +-
 admin-cli/util/pegasus_node.go                     | 21 ++++++++++++++
 5 files changed, 56 insertions(+), 8 deletions(-)

diff --git a/admin-cli/cmd/init.go b/admin-cli/cmd/init.go
index f910d67eb..50fcb2839 100644
--- a/admin-cli/cmd/init.go
+++ b/admin-cli/cmd/init.go
@@ -34,5 +34,5 @@ var pegasusClient *executor.Client
 func Init(metaList []string) {
        globalMetaList = metaList
 
-       pegasusClient = executor.NewClient(os.Stdout, globalMetaList)
+       pegasusClient, _ = executor.NewClient(os.Stdout, globalMetaList, true)
 }
diff --git a/admin-cli/executor/client.go b/admin-cli/executor/client.go
index b41c7189f..62998177a 100644
--- a/admin-cli/executor/client.go
+++ b/admin-cli/executor/client.go
@@ -23,6 +23,7 @@ import (
        "fmt"
        "io"
        "os"
+       "strings"
 
        "github.com/apache/incubator-pegasus/admin-cli/client"
        "github.com/apache/incubator-pegasus/admin-cli/util"
@@ -45,14 +46,18 @@ type Client struct {
 }
 
 // NewClient creates a client for accessing Pegasus cluster for use of 
admin-cli.
-func NewClient(writer io.Writer, metaAddrs []string) *Client {
+// When listing nodes fails, willExit == true means call os.Exit().
+func NewClient(writer io.Writer, metaAddrs []string, willExit bool) (*Client, 
error) {
        meta := client.NewRPCBasedMeta(metaAddrs)
 
        // TODO(wutao): initialize replica-nodes lazily
        nodes, err := meta.ListNodes()
        if err != nil {
-               fmt.Fprintf(writer, "fatal: failed to list nodes [%s]\n", err)
-               os.Exit(1)
+               fmt.Printf("Error: failed to list nodes [%s]\n", err)
+               if willExit {
+                       os.Exit(1)
+               }
+               return nil, fmt.Errorf("failed to list nodes [%s]", err)
        }
 
        var replicaAddrs []string
@@ -65,5 +70,26 @@ func NewClient(writer io.Writer, metaAddrs []string) *Client 
{
                Meta:   meta,
                Nodes:  util.NewPegasusNodeManager(metaAddrs, replicaAddrs),
                Perf:   aggregate.NewPerfClient(metaAddrs),
+       }, nil
+}
+
+func CloseClient(client *Client) error {
+       var errorStrings []string
+       err := client.Meta.Close()
+       if err != nil {
+               fmt.Printf("Error: failed to close meta session [%s].\n", err)
+               errorStrings = append(errorStrings, err.Error())
+       }
+
+       client.Perf.Close()
+
+       err = client.Nodes.CloseAllNodes()
+       if err != nil {
+               fmt.Printf("Error: failed to close nodes session [%s].\n", err)
+               errorStrings = append(errorStrings, err.Error())
+       }
+       if len(errorStrings) != 0 {
+               return fmt.Errorf("%s", strings.Join(errorStrings, "\n"))
        }
+       return nil
 }
diff --git a/admin-cli/executor/disk_info.go b/admin-cli/executor/disk_info.go
index a7793bb4d..83c49511d 100644
--- a/admin-cli/executor/disk_info.go
+++ b/admin-cli/executor/disk_info.go
@@ -45,7 +45,7 @@ func QueryDiskInfo(client *Client, infoType DiskInfoType, 
replicaServer string,
 }
 
 func GetDiskInfo(client *Client, infoType DiskInfoType, replicaServer string, 
tableName string, diskTag string, print bool) ([]interface{}, error) {
-       resp, err := sendQueryDiskInfoRequest(client, replicaServer, tableName)
+       resp, err := SendQueryDiskInfoRequest(client, replicaServer, tableName)
        if err != nil {
                return nil, err
        }
@@ -60,7 +60,7 @@ func GetDiskInfo(client *Client, infoType DiskInfoType, 
replicaServer string, ta
        }
 }
 
-func sendQueryDiskInfoRequest(client *Client, replicaServer string, tableName 
string) (*radmin.QueryDiskInfoResponse, error) {
+func SendQueryDiskInfoRequest(client *Client, replicaServer string, tableName 
string) (*radmin.QueryDiskInfoResponse, error) {
        ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
        defer cancel()
 
@@ -88,7 +88,7 @@ func QueryAllNodesDiskInfo(client *Client, tableName string) 
(map[string]*radmin
        }
        for _, nodeInfo := range nodeInfos {
                address := nodeInfo.GetAddress().GetAddress()
-               resp, err := sendQueryDiskInfoRequest(client, address, 
tableName)
+               resp, err := SendQueryDiskInfoRequest(client, address, 
tableName)
                if err != nil {
                        return respMap, err
                }
diff --git a/admin-cli/executor/toolkits/tablemigrator/switcher.go 
b/admin-cli/executor/toolkits/tablemigrator/switcher.go
index 76c223a98..41f2be6d0 100644
--- a/admin-cli/executor/toolkits/tablemigrator/switcher.go
+++ b/admin-cli/executor/toolkits/tablemigrator/switcher.go
@@ -59,7 +59,8 @@ func SwitchMetaAddrs(client *executor.Client, zkAddr string, 
zkRoot string, tabl
 
        originMeta := client.Meta
        targetAddrList := strings.Split(targetAddrs, ",")
-       targetMeta := executor.NewClient(os.Stdout, targetAddrList).Meta
+       pegasusClient, _ := executor.NewClient(os.Stdout, targetAddrList, true)
+       targetMeta := pegasusClient.Meta
        env := map[string]string{
                "replica.deny_client_request": "reconfig*all",
        }
diff --git a/admin-cli/util/pegasus_node.go b/admin-cli/util/pegasus_node.go
index 5f7320820..e0554008f 100644
--- a/admin-cli/util/pegasus_node.go
+++ b/admin-cli/util/pegasus_node.go
@@ -84,6 +84,13 @@ func (n *PegasusNode) RPCAddress() *base.RPCAddress {
        return base.NewRPCAddress(n.IP, n.Port)
 }
 
+func (n *PegasusNode) Close() error {
+       if n.session != nil {
+               return n.session.Close()
+       }
+       return nil
+}
+
 // NewNodeFromTCPAddr creates a node from tcp address.
 // NOTE:
 //   - Will not initialize TCP connection unless needed.
@@ -211,3 +218,17 @@ func (m *PegasusNodeManager) GetPerfSession(addr string, 
ntype session.NodeType)
 
        return aggregate.WrapPerf(addr, node.session)
 }
+
+func (m *PegasusNodeManager) CloseAllNodes() error {
+       var errorStrings []string
+       for _, n := range m.nodes {
+               err := n.Close()
+               if err != nil {
+                       errorStrings = append(errorStrings, err.Error())
+               }
+       }
+       if len(errorStrings) != 0 {
+               return fmt.Errorf("%s", strings.Join(errorStrings, "\n"))
+       }
+       return nil
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to