This is an automated email from the ASF dual-hosted git repository.

gehafearless pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new b0e4c596d refactor(go-client): refactor admin APIs for meta by 
reflection (#1929)
b0e4c596d is described below

commit b0e4c596dbf096d454846d386567a740d5f87fbd
Author: Dan Wang <[email protected]>
AuthorDate: Fri Mar 8 10:12:22 2024 +0800

    refactor(go-client): refactor admin APIs for meta by reflection (#1929)
---
 .github/workflows/lint_and_test_go-client.yml |   2 +-
 go-client/admin/client.go                     | 192 +++++++++++++++++++-------
 go-client/admin/client_test.go                |  57 ++++----
 go-client/idl/base/error_code.go              |  34 +++++
 4 files changed, 211 insertions(+), 74 deletions(-)

diff --git a/.github/workflows/lint_and_test_go-client.yml 
b/.github/workflows/lint_and_test_go-client.yml
index 79d9e12b0..4e02501f1 100644
--- a/.github/workflows/lint_and_test_go-client.yml
+++ b/.github/workflows/lint_and_test_go-client.yml
@@ -78,7 +78,7 @@ jobs:
       - name: Lint
         uses: golangci/golangci-lint-action@v3
         with:
-          version: v1.55.2
+          version: v1.56.2
           working-directory: ./go-client
 
   build_server:
diff --git a/go-client/admin/client.go b/go-client/admin/client.go
index d5848560f..77851f477 100644
--- a/go-client/admin/client.go
+++ b/go-client/admin/client.go
@@ -22,6 +22,7 @@ package admin
 import (
        "context"
        "fmt"
+       "reflect"
        "time"
 
        "github.com/apache/incubator-pegasus/go-client/idl/admin"
@@ -33,51 +34,109 @@ import (
 // Client provides the administration API to a specific cluster.
 // Remember only the superusers configured to the cluster have the admin 
priviledges.
 type Client interface {
-       CreateTable(ctx context.Context, tableName string, partitionCount int, 
successIfExist_optional ...bool) error
-
-       DropTable(ctx context.Context, tableName string) error
-
-       ListTables(ctx context.Context) ([]*TableInfo, error)
-}
-
-// TableInfo is the table information.
-type TableInfo struct {
-       Name string
-
-       // Envs is a set of attributes binding to this table.
-       Envs map[string]string
+       Close() error
+
+       // The timeout specify the max duration that is spent on an client 
request. For
+       // example, if the client is based on RPC, it would be the timeout for 
the RPC
+       // request.
+       GetTimeout() time.Duration
+       SetTimeout(timeout time.Duration)
+
+       // `maxWaitSeconds` specify the number of seconds that is spent on 
waiting for
+       // the created table to be ready. This method would return error once 
the table
+       // is still not ready after `maxWaitSeconds`. The administrator should 
check if
+       // there is something wrong with the table.
+       CreateTable(tableName string, partitionCount int32, replicaCount int32, 
envs map[string]string, maxWaitSeconds int32, successIfExistOptional ...bool) 
(int32, error)
+
+       // `reserveSeconds` specify the retention interval for a table before 
it is actually dropped.
+       DropTable(tableName string, reserveSeconds int64) error
+
+       // Empty `args` means "list all available tables"; Otherwise, the only 
parameter would
+       // specify the status of the returned tables.
+       ListTables(args ...interface{}) ([]*replication.AppInfo, error)
 }
 
 type Config struct {
        MetaServers []string `json:"meta_servers"`
+       Timeout     time.Duration
 }
 
 // NewClient returns an instance of Client.
 func NewClient(cfg Config) Client {
        return &rpcBasedClient{
-               metaManager: session.NewMetaManager(cfg.MetaServers, 
session.NewNodeSession),
+               meta:       session.NewMetaManager(cfg.MetaServers, 
session.NewNodeSession),
+               rpcTimeout: cfg.Timeout,
        }
 }
 
 type rpcBasedClient struct {
-       metaManager *session.MetaManager
+       meta       *session.MetaManager
+       rpcTimeout time.Duration
+}
+
+func (c *rpcBasedClient) Close() error {
+       return c.meta.Close()
+}
+
+func (c *rpcBasedClient) GetTimeout() time.Duration {
+       return c.rpcTimeout
 }
 
-func (c *rpcBasedClient) waitTableReady(ctx context.Context, tableName string, 
partitionCount int) error {
-       const replicaCount int = 3
+func (c *rpcBasedClient) SetTimeout(timeout time.Duration) {
+       c.rpcTimeout = timeout
+}
+
+// Call RPC methods(go-client/session/admin_rpc_types.go) of 
session.MetaManager by reflection.
+// `req` and `resp` are the request and response structs of RPC. `callback` 
always accepts
+// non-nil `resp`.
+func (c *rpcBasedClient) callMeta(methodName string, req interface{}, callback 
func(resp interface{})) error {
+       ctx, cancel := context.WithTimeout(context.Background(), c.rpcTimeout)
+       defer cancel()
+
+       // There are 2 kinds of structs for the result which could be processed:
+       // * error
+       // * (response, error)
+       result := 
reflect.ValueOf(c.meta).MethodByName(methodName).Call([]reflect.Value{
+               reflect.ValueOf(ctx),
+               reflect.ValueOf(req),
+       })
+
+       // The last element must be error.
+       ierr := result[len(result)-1].Interface()
+
+       var err error
+       if ierr != nil {
+               err = ierr.(error)
+       }
+
+       if len(result) == 1 {
+               return err
+       }
+
+       // The struct of result must be (response, error).
+       if !result[0].IsNil() {
+               callback(result[0].Interface())
+       }
+
+       return err
+}
 
-       for {
-               resp, err := c.metaManager.QueryConfig(ctx, tableName)
+func (c *rpcBasedClient) waitTableReady(tableName string, partitionCount 
int32, replicaCount int32, maxWaitSeconds int32) error {
+       for ; maxWaitSeconds > 0; maxWaitSeconds-- {
+               var resp *replication.QueryCfgResponse
+               err := c.callMeta("QueryConfig", tableName, func(iresp 
interface{}) {
+                       resp = iresp.(*replication.QueryCfgResponse)
+               })
                if err != nil {
                        return err
                }
                if resp.GetErr().Errno != base.ERR_OK.String() {
-                       return fmt.Errorf("QueryConfig failed: %s", 
resp.GetErr().String())
+                       return fmt.Errorf("QueryConfig failed: %s", 
base.GetResponseError(resp))
                }
 
-               readyCount := 0
+               readyCount := int32(0)
                for _, part := range resp.Partitions {
-                       if part.Primary.GetRawAddress() != 0 && 
len(part.Secondaries)+1 == replicaCount {
+                       if part.Primary.GetRawAddress() != 0 && 
int32(len(part.Secondaries)+1) == replicaCount {
                                readyCount++
                        }
                }
@@ -86,55 +145,92 @@ func (c *rpcBasedClient) waitTableReady(ctx 
context.Context, tableName string, p
                }
                time.Sleep(time.Second)
        }
+
+       if maxWaitSeconds <= 0 {
+               return fmt.Errorf("After %d seconds, table '%s' is still not 
ready", maxWaitSeconds, tableName)
+       }
+
        return nil
 }
 
-func (c *rpcBasedClient) CreateTable(ctx context.Context, tableName string, 
partitionCount int, successIfExist_optional ...bool) error {
+func (c *rpcBasedClient) CreateTable(tableName string, partitionCount int32, 
replicaCount int32, envs map[string]string, maxWaitSeconds int32, 
successIfExistOptional ...bool) (int32, error) {
        successIfExist := true
-       if len(successIfExist_optional) > 0 {
-               successIfExist = successIfExist_optional[0]
+       if len(successIfExistOptional) > 0 {
+               successIfExist = successIfExistOptional[0]
        }
-       _, err := c.metaManager.CreateApp(ctx, 
&admin.ConfigurationCreateAppRequest{
+
+       req := &admin.ConfigurationCreateAppRequest{
                AppName: tableName,
                Options: &admin.CreateAppOptions{
                        PartitionCount: int32(partitionCount),
-                       ReplicaCount:   3,
+                       ReplicaCount:   replicaCount,
                        SuccessIfExist: successIfExist,
                        AppType:        "pegasus",
-                       Envs:           make(map[string]string),
                        IsStateful:     true,
+                       Envs:           envs,
+               }}
+
+       var appID int32
+       var respErr error
+       err := c.callMeta("CreateApp", req, func(iresp interface{}) {
+               resp := iresp.(*admin.ConfigurationCreateAppResponse)
+               appID = resp.Appid
+               respErr = base.GetResponseError(resp)
+       })
+       if err != nil {
+               return appID, err
+       }
+
+       err = c.waitTableReady(tableName, partitionCount, replicaCount, 
maxWaitSeconds)
+       if err != nil {
+               return appID, err
+       }
+
+       return appID, respErr
+}
+
+func (c *rpcBasedClient) DropTable(tableName string, reserveSeconds int64) 
error {
+       req := &admin.ConfigurationDropAppRequest{
+               AppName: tableName,
+               Options: &admin.DropAppOptions{
+                       SuccessIfNotExist: true,
+                       ReserveSeconds:    &reserveSeconds, // Optional for 
thrift
                },
+       }
+
+       var respErr error
+       err := c.callMeta("DropApp", req, func(iresp interface{}) {
+               respErr = 
base.GetResponseError(iresp.(*admin.ConfigurationDropAppResponse))
        })
        if err != nil {
                return err
        }
-       err = c.waitTableReady(ctx, tableName, partitionCount)
-       return err
+
+       return respErr
 }
 
-func (c *rpcBasedClient) DropTable(ctx context.Context, tableName string) 
error {
-       req := admin.NewConfigurationDropAppRequest()
-       req.AppName = tableName
-       reserveSeconds := int64(1) // delete immediately. the caller is 
responsible for the soft deletion of table.
-       req.Options = &admin.DropAppOptions{
-               SuccessIfNotExist: true,
-               ReserveSeconds:    &reserveSeconds,
+func (c *rpcBasedClient) listTables(status replication.AppStatus) 
([]*replication.AppInfo, error) {
+       req := &admin.ConfigurationListAppsRequest{
+               Status: status,
        }
-       _, err := c.metaManager.DropApp(ctx, req)
-       return err
-}
 
-func (c *rpcBasedClient) ListTables(ctx context.Context) ([]*TableInfo, error) 
{
-       resp, err := c.metaManager.ListApps(ctx, 
&admin.ConfigurationListAppsRequest{
-               Status: replication.AppStatus_AS_AVAILABLE,
+       var tables []*replication.AppInfo
+       var respErr error
+       err := c.callMeta("ListApps", req, func(iresp interface{}) {
+               resp := iresp.(*admin.ConfigurationListAppsResponse)
+               tables = resp.Infos
+               respErr = base.GetResponseError(resp)
        })
        if err != nil {
-               return nil, err
+               return tables, err
        }
 
-       var results []*TableInfo
-       for _, app := range resp.Infos {
-               results = append(results, &TableInfo{Name: app.AppName, Envs: 
app.Envs})
+       return tables, respErr
+}
+
+func (c *rpcBasedClient) ListTables(args ...interface{}) 
([]*replication.AppInfo, error) {
+       if len(args) == 0 {
+               return c.listTables(replication.AppStatus_AS_AVAILABLE)
        }
-       return results, nil
+       return c.listTables(args[0].(replication.AppStatus))
 }
diff --git a/go-client/admin/client_test.go b/go-client/admin/client_test.go
index 3d6ee3ddf..8fdd13a4f 100644
--- a/go-client/admin/client_test.go
+++ b/go-client/admin/client_test.go
@@ -25,51 +25,62 @@ import (
        "testing"
        "time"
 
+       "github.com/apache/incubator-pegasus/go-client/idl/replication"
        "github.com/apache/incubator-pegasus/go-client/pegasus"
        "github.com/stretchr/testify/assert"
 )
 
-func TestAdmin_Table(t *testing.T) {
-       c := NewClient(Config{
+const (
+       replicaCount   = 3
+       maxWaitSeconds = 600
+       reserveSeconds = 1
+)
+
+func defaultConfig() Config {
+       return Config{
                MetaServers: []string{"0.0.0.0:34601", "0.0.0.0:34602", 
"0.0.0.0:34603"},
-       })
+               Timeout:     30 * time.Second,
+       }
+}
+
+func TestAdmin_Table(t *testing.T) {
+       c := NewClient(defaultConfig())
 
-       hasTable := func(tables []*TableInfo, tableName string) bool {
+       hasTable := func(tables []*replication.AppInfo, tableName string) bool {
                for _, tb := range tables {
-                       if tb.Name == tableName {
+                       if tb.AppName == tableName {
                                return true
                        }
                }
                return false
        }
 
-       err := c.DropTable(context.Background(), "admin_table_test")
+       err := c.DropTable("admin_table_test", reserveSeconds)
        assert.Nil(t, err)
 
        // no such table after deletion
-       tables, err := c.ListTables(context.Background())
+       tables, err := c.ListTables()
        assert.Nil(t, err)
        assert.False(t, hasTable(tables, "admin_table_test"))
 
-       err = c.CreateTable(context.Background(), "admin_table_test", 16)
+       _, err = c.CreateTable("admin_table_test", 16, replicaCount, 
make(map[string]string), maxWaitSeconds)
        assert.Nil(t, err)
 
-       tables, err = c.ListTables(context.Background())
+       tables, err = c.ListTables()
        assert.Nil(t, err)
        assert.True(t, hasTable(tables, "admin_table_test"))
 
-       err = c.DropTable(context.Background(), "admin_table_test")
+       err = c.DropTable("admin_table_test", reserveSeconds)
        assert.Nil(t, err)
 }
 
 func TestAdmin_ListTablesTimeout(t *testing.T) {
        c := NewClient(Config{
                MetaServers: []string{"0.0.0.0:123456"},
+               Timeout:     500 * time.Millisecond,
        })
 
-       ctx, cancel := context.WithTimeout(context.Background(), 
500*time.Millisecond)
-       defer cancel()
-       _, err := c.ListTables(ctx)
+       _, err := c.ListTables()
        assert.Equal(t, err, context.DeadlineExceeded)
 }
 
@@ -77,14 +88,9 @@ func TestAdmin_ListTablesTimeout(t *testing.T) {
 func TestAdmin_CreateTableMustAvailable(t *testing.T) {
        const tableName = "admin_table_test"
 
-       c := NewClient(Config{
-               MetaServers: []string{"0.0.0.0:34601", "0.0.0.0:34602", 
"0.0.0.0:34603"},
-       })
-
-       ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
-       defer cancel()
+       c := NewClient(defaultConfig())
 
-       err := c.CreateTable(context.Background(), tableName, 8)
+       _, err := c.CreateTable(tableName, 8, replicaCount, 
make(map[string]string), maxWaitSeconds)
        if !assert.NoError(t, err) {
                assert.Fail(t, err.Error())
        }
@@ -115,24 +121,25 @@ func TestAdmin_CreateTableMustAvailable(t *testing.T) {
                }
        }
 
+       ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
+       defer cancel()
+
        err = tb.Set(ctx, []byte("a"), []byte("a"), []byte("a"))
        if !assert.NoError(t, err) {
                assert.Fail(t, err.Error())
        }
 
        // cleanup
-       err = c.DropTable(context.Background(), tableName)
+       err = c.DropTable(tableName, reserveSeconds)
        if !assert.NoError(t, err) {
                assert.Fail(t, err.Error())
        }
 }
 
 func TestAdmin_GetAppEnvs(t *testing.T) {
-       c := NewClient(Config{
-               MetaServers: []string{"0.0.0.0:34601", "0.0.0.0:34602", 
"0.0.0.0:34603"},
-       })
+       c := NewClient(defaultConfig())
 
-       tables, err := c.ListTables(context.Background())
+       tables, err := c.ListTables()
        assert.Nil(t, err)
        for _, tb := range tables {
                assert.Empty(t, tb.Envs)
diff --git a/go-client/idl/base/error_code.go b/go-client/idl/base/error_code.go
index f70404df3..c105cb48b 100644
--- a/go-client/idl/base/error_code.go
+++ b/go-client/idl/base/error_code.go
@@ -21,6 +21,7 @@ package base
 
 import (
        "fmt"
+       "reflect"
 
        "github.com/apache/thrift/lib/go/thrift"
 )
@@ -136,6 +137,39 @@ func (ec *ErrorCode) String() string {
        return fmt.Sprintf("ErrorCode(%+v)", *ec)
 }
 
+type baseError struct {
+       message string
+}
+
+// Implement error interface.
+func (e *baseError) Error() string {
+       if e == nil || e.message == ERR_OK.String() {
+               return ERR_OK.String()
+       }
+       return e.message
+}
+
+// Convert ErrorCode to error.
+func (ec *ErrorCode) AsError() error {
+       if ec == nil || ec.Errno == ERR_OK.String() {
+               return nil
+       }
+       return &baseError{
+               message: ec.Errno,
+       }
+}
+
+// `resp` is the thrift-generated response struct of RPC.
+func GetResponseError(resp interface{}) error {
+       result := 
reflect.ValueOf(resp).MethodByName("GetErr").Call([]reflect.Value{})
+       iec := result[0].Interface()
+       if iec == nil {
+               return nil
+       }
+
+       return iec.(*ErrorCode).AsError()
+}
+
 //go:generate enumer -type=RocksDBErrCode -output=rocskdb_err_string.go
 type RocksDBErrCode int32
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to