This is an automated email from the ASF dual-hosted git repository.

xikai pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-horaedb-meta.git

commit 475647fce2a7876cdf133b894a93dc10c05b0027
Author: CooooolFrog <[email protected]>
AuthorDate: Thu Dec 7 17:09:13 2023 +0800

    refactor: refactor create table procedure (#288)
    
    ## Rationale
    Due to the implementation problem of create table procedure, the current
    table creation error message of ceresmeta cannot be returned to ceresdb
    normally. We need to reconstruct it.
    
    ## Detailed Changes
    * Refactor create table procedure, make it could return error
    information normally.
    * Modify the error message returned in the service.
    
    ## Test Plan
    Pass CI.
---
 .../procedure/ddl/createtable/create_table.go      | 120 ++++++++++++---------
 server/service/grpc/service.go                     |  10 +-
 2 files changed, 74 insertions(+), 56 deletions(-)

diff --git a/server/coordinator/procedure/ddl/createtable/create_table.go 
b/server/coordinator/procedure/ddl/createtable/create_table.go
index 414caaf..49138a4 100644
--- a/server/coordinator/procedure/ddl/createtable/create_table.go
+++ b/server/coordinator/procedure/ddl/createtable/create_table.go
@@ -37,30 +37,30 @@ import (
 )
 
 const (
-       eventPrepare = "EventPrepare"
-       eventFailed  = "EventFailed"
-       eventSuccess = "EventSuccess"
-
-       stateBegin   = "StateBegin"
-       stateWaiting = "StateWaiting"
-       stateFinish  = "StateFinish"
-       stateFailed  = "StateFailed"
+       eventCreateMetadata = "EventCreateMetadata"
+       eventCreateOnShard  = "EventCreateOnShard"
+       eventFinish         = "EventFinish"
+
+       stateBegin          = "StateBegin"
+       stateCreateMetadata = "StateCreateMetadata"
+       stateCreateOnShard  = "StateCreateOnShard"
+       stateFinish         = "StateFinish"
 )
 
 var (
        createTableEvents = fsm.Events{
-               {Name: eventPrepare, Src: []string{stateBegin}, Dst: 
stateWaiting},
-               {Name: eventSuccess, Src: []string{stateWaiting}, Dst: 
stateFinish},
-               {Name: eventFailed, Src: []string{stateWaiting}, Dst: 
stateFailed},
+               {Name: eventCreateMetadata, Src: []string{stateBegin}, Dst: 
stateCreateMetadata},
+               {Name: eventCreateOnShard, Src: []string{stateCreateMetadata}, 
Dst: stateCreateOnShard},
+               {Name: eventFinish, Src: []string{stateCreateOnShard}, Dst: 
stateFinish},
        }
        createTableCallbacks = fsm.Callbacks{
-               eventPrepare: prepareCallback,
-               eventFailed:  failedCallback,
-               eventSuccess: successCallback,
+               eventCreateMetadata: createMetadataCallback,
+               eventCreateOnShard:  createOnShard,
+               eventFinish:         createFinish,
        }
 )
 
-func prepareCallback(event *fsm.Event) {
+func createMetadataCallback(event *fsm.Event) {
        req, err := procedure.GetRequestFromEvent[*callbackRequest](event)
        if err != nil {
                procedure.CancelEventWithLog(event, err, "get request from 
event")
@@ -73,44 +73,67 @@ func prepareCallback(event *fsm.Event) {
                TableName:     params.SourceReq.GetName(),
                PartitionInfo: storage.PartitionInfo{Info: 
params.SourceReq.PartitionTableInfo.GetPartitionInfo()},
        }
-       result, err := params.ClusterMetadata.CreateTableMetadata(req.ctx, 
createTableMetadataRequest)
+       _, err = params.ClusterMetadata.CreateTableMetadata(req.ctx, 
createTableMetadataRequest)
        if err != nil {
                procedure.CancelEventWithLog(event, err, "create table 
metadata")
                return
        }
 
        log.Debug("create table metadata finish", zap.String("tableName", 
createTableMetadataRequest.TableName))
+}
+
+func createOnShard(event *fsm.Event) {
+       req, err := procedure.GetRequestFromEvent[*callbackRequest](event)
+       if err != nil {
+               procedure.CancelEventWithLog(event, err, "get request from 
event")
+               return
+       }
+       params := req.p.params
+
+       table, ok, err := 
params.ClusterMetadata.GetTable(params.SourceReq.GetSchemaName(), 
params.SourceReq.GetName())
+       if err != nil {
+               procedure.CancelEventWithLog(event, err, "get table metadata 
failed", zap.String("schemaName", params.SourceReq.GetSchemaName()), 
zap.String("tableName", params.SourceReq.GetName()))
+               return
+       }
+       if !ok {
+               procedure.CancelEventWithLog(event, err, "table metadata not 
found", zap.String("schemaName", params.SourceReq.GetSchemaName()), 
zap.String("tableName", params.SourceReq.GetName()))
+               return
+       }
 
        shardVersionUpdate := metadata.ShardVersionUpdate{
                ShardID:       params.ShardID,
                LatestVersion: 
req.p.relatedVersionInfo.ShardWithVersion[params.ShardID],
        }
 
-       createTableRequest := ddl.BuildCreateTableRequest(result.Table, 
shardVersionUpdate, params.SourceReq)
+       createTableRequest := ddl.BuildCreateTableRequest(table, 
shardVersionUpdate, params.SourceReq)
        latestShardVersion, err := ddl.CreateTableOnShard(req.ctx, 
params.ClusterMetadata, params.Dispatch, params.ShardID, createTableRequest)
        if err != nil {
                procedure.CancelEventWithLog(event, err, "dispatch create table 
on shard")
                return
        }
 
-       log.Debug("dispatch createTableOnShard finish", zap.String("tableName", 
createTableMetadataRequest.TableName))
+       log.Debug("dispatch createTableOnShard finish", zap.String("tableName", 
table.Name))
+
+       shardVersionUpdate = metadata.ShardVersionUpdate{
+               ShardID:       params.ShardID,
+               LatestVersion: latestShardVersion,
+       }
 
-       shardVersionUpdate.LatestVersion = latestShardVersion
-       err = params.ClusterMetadata.AddTableTopology(req.ctx, 
shardVersionUpdate, result.Table)
+       err = params.ClusterMetadata.AddTableTopology(req.ctx, 
shardVersionUpdate, table)
        if err != nil {
                procedure.CancelEventWithLog(event, err, "add table topology")
                return
        }
 
-       log.Debug("add table topology finish", zap.String("tableName", 
createTableMetadataRequest.TableName))
-
        req.createTableResult = &metadata.CreateTableResult{
-               Table:              result.Table,
+               Table:              table,
                ShardVersionUpdate: shardVersionUpdate,
        }
+
+       log.Debug("add table topology finish", zap.String("tableName", 
table.Name))
 }
 
-func successCallback(event *fsm.Event) {
+func createFinish(event *fsm.Event) {
        req, err := procedure.GetRequestFromEvent[*callbackRequest](event)
        if err != nil {
                procedure.CancelEventWithLog(event, err, "get request from 
event")
@@ -123,18 +146,6 @@ func successCallback(event *fsm.Event) {
        }
 }
 
-func failedCallback(event *fsm.Event) {
-       req, err := procedure.GetRequestFromEvent[*callbackRequest](event)
-       if err != nil {
-               procedure.CancelEventWithLog(event, err, "get request from 
event")
-               return
-       }
-
-       if err := req.p.params.OnFailed(event.Err); err != nil {
-               log.Error("exec failed callback failed")
-       }
-}
-
 // callbackRequest is fsm callbacks param.
 type callbackRequest struct {
        ctx context.Context
@@ -179,6 +190,7 @@ type Procedure struct {
        fsm                *fsm.FSM
        params             ProcedureParams
        relatedVersionInfo procedure.RelatedVersionInfo
+
        // Protect the state.
        lock  sync.RWMutex
        state procedure.State
@@ -217,29 +229,35 @@ func (p *Procedure) Kind() procedure.Kind {
 func (p *Procedure) Start(ctx context.Context) error {
        p.updateState(procedure.StateRunning)
 
+       // Try to load persist data.
        req := &callbackRequest{
                ctx:               ctx,
                p:                 p,
                createTableResult: nil,
        }
 
-       if err := p.fsm.Event(eventPrepare, req); err != nil {
-               err1 := p.fsm.Event(eventFailed, req)
-               p.updateState(procedure.StateFailed)
-               if err1 != nil {
-                       err = errors.WithMessagef(err, "send eventFailed, 
err:%v", err1)
+       for {
+               switch p.fsm.Current() {
+               case stateBegin:
+                       if err := p.fsm.Event(eventCreateMetadata, req); err != 
nil {
+                               _ = p.params.OnFailed(err)
+                               return err
+                       }
+               case stateCreateMetadata:
+                       if err := p.fsm.Event(eventCreateOnShard, req); err != 
nil {
+                               _ = p.params.OnFailed(err)
+                               return err
+                       }
+               case stateCreateOnShard:
+                       if err := p.fsm.Event(eventFinish, req); err != nil {
+                               _ = p.params.OnFailed(err)
+                               return err
+                       }
+               case stateFinish:
+                       p.updateState(procedure.StateFinished)
+                       return nil
                }
-               _ = p.params.OnFailed(err)
-               return errors.WithMessage(err, "send eventPrepare")
        }
-
-       if err := p.fsm.Event(eventSuccess, req); err != nil {
-               _ = p.params.OnFailed(err)
-               return errors.WithMessage(err, "send eventSuccess")
-       }
-
-       p.updateState(procedure.StateFinished)
-       return nil
 }
 
 func (p *Procedure) Cancel(_ context.Context) error {
diff --git a/server/service/grpc/service.go b/server/service/grpc/service.go
index b5f64f4..d4693de 100644
--- a/server/service/grpc/service.go
+++ b/server/service/grpc/service.go
@@ -174,7 +174,7 @@ func (s *Service) CreateTable(ctx context.Context, req 
*metaservicepb.CreateTabl
 
        metaClient, err := s.getForwardedMetaClient(ctx)
        if err != nil {
-               return &metaservicepb.CreateTableResponse{Header: 
responseHeader(err, "create table")}, nil
+               return &metaservicepb.CreateTableResponse{Header: 
responseHeader(err, err.Error())}, nil
        }
 
        // Forward request to the leader.
@@ -188,7 +188,7 @@ func (s *Service) CreateTable(ctx context.Context, req 
*metaservicepb.CreateTabl
        c, err := clusterManager.GetCluster(ctx, 
req.GetHeader().GetClusterName())
        if err != nil {
                log.Error("fail to create table", zap.Error(err))
-               return &metaservicepb.CreateTableResponse{Header: 
responseHeader(err, "create table")}, nil
+               return &metaservicepb.CreateTableResponse{Header: 
responseHeader(err, err.Error())}, nil
        }
 
        errorCh := make(chan error, 1)
@@ -211,13 +211,13 @@ func (s *Service) CreateTable(ctx context.Context, req 
*metaservicepb.CreateTabl
        })
        if err != nil {
                log.Error("fail to create table, factory create procedure", 
zap.Error(err))
-               return &metaservicepb.CreateTableResponse{Header: 
responseHeader(err, "create table")}, nil
+               return &metaservicepb.CreateTableResponse{Header: 
responseHeader(err, err.Error())}, nil
        }
 
        err = c.GetProcedureManager().Submit(ctx, p)
        if err != nil {
                log.Error("fail to create table, manager submit procedure", 
zap.Error(err))
-               return &metaservicepb.CreateTableResponse{Header: 
responseHeader(err, "create table")}, nil
+               return &metaservicepb.CreateTableResponse{Header: 
responseHeader(err, err.Error())}, nil
        }
 
        select {
@@ -239,7 +239,7 @@ func (s *Service) CreateTable(ctx context.Context, req 
*metaservicepb.CreateTabl
                }, nil
        case err = <-errorCh:
                log.Warn("create table failed", zap.String("tableName", 
req.Name), zap.Int64("costTime", time.Since(start).Milliseconds()), 
zap.Error(err))
-               return &metaservicepb.CreateTableResponse{Header: 
responseHeader(err, "create table")}, nil
+               return &metaservicepb.CreateTableResponse{Header: 
responseHeader(err, err.Error())}, nil
        }
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to