This is an automated email from the ASF dual-hosted git repository.
alexstocks pushed a commit to branch feature/saga
in repository https://gitbox.apache.org/repos/asf/incubator-seata-go.git
The following commit(s) were added to refs/heads/feature/saga by this push:
new fe136633 Support Saga mode config initialization (#871)
fe136633 is described below
commit fe136633225a25436713a4883fcc55fff5021e18
Author: flypiggy <[email protected]>
AuthorDate: Tue Sep 9 10:07:13 2025 +0800
Support Saga mode config initialization (#871)
* remove redundant logic and methods
Make the loadconfig method only responsible for loading static parameters
and remove redundant memory storage
* add the init method and refine other logic
* remove redundant logic and methods
Make the loadconfig method only responsible for loading static parameters
and remove redundant memory storage
* add the init method and refine other logic
* update
* upd
---------
Co-authored-by: FengZhang <[email protected]>
---
.../engine/config/default_statemachine_config.go | 78 +++++---
.../config/default_statemachine_config_test.go | 208 ++++++++++++--------
.../engine/config/testdata/order_saga.json | 42 ++--
.../engine/config/testdata/order_saga.yaml | 46 ++---
.../{order_saga.json => order_saga_v2.json} | 104 +++++++---
.../engine/config/testdata/order_saga_v2.yaml | 213 +++++++++++++++++++++
.../engine/config/testdata/saga_config.json | 13 ++
.../engine/config/testdata/saga_config.yaml | 25 +++
.../engine/config/testdata/saga_config_v2.json | 13 ++
.../engine/config/testdata/saga_config_v2.yaml | 10 +
.../core/process_ctrl_statemachine_engine.go | 12 +-
.../engine/expr/expression_factory_manager.go | 5 +-
.../repo/repository/state_machine_repository.go | 10 +-
.../statemachine/engine/statemachine_config.go | 5 +-
14 files changed, 597 insertions(+), 187 deletions(-)
diff --git a/pkg/saga/statemachine/engine/config/default_statemachine_config.go
b/pkg/saga/statemachine/engine/config/default_statemachine_config.go
index f2902572..12225330 100644
--- a/pkg/saga/statemachine/engine/config/default_statemachine_config.go
+++ b/pkg/saga/statemachine/engine/config/default_statemachine_config.go
@@ -30,14 +30,12 @@ import (
"strings"
"sync"
- "github.com/seata/seata-go/pkg/saga/statemachine"
"github.com/seata/seata-go/pkg/saga/statemachine/engine"
"github.com/seata/seata-go/pkg/saga/statemachine/engine/expr"
"github.com/seata/seata-go/pkg/saga/statemachine/engine/invoker"
"github.com/seata/seata-go/pkg/saga/statemachine/engine/repo"
"github.com/seata/seata-go/pkg/saga/statemachine/engine/sequence"
"github.com/seata/seata-go/pkg/saga/statemachine/process_ctrl"
- "github.com/seata/seata-go/pkg/saga/statemachine/statelang/parser"
"github.com/seata/seata-go/pkg/saga/statemachine/store"
)
@@ -62,9 +60,6 @@ type DefaultStateMachineConfig struct {
rmReportSuccessEnable bool
stateMachineResources []string
- // State machine definitions
- stateMachineDefs map[string]*statemachine.StateMachineObject
-
// Components
processController process_ctrl.ProcessController
@@ -284,8 +279,8 @@ func (c *DefaultStateMachineConfig)
SetRmReportSuccessEnable(rmReportSuccessEnab
c.rmReportSuccessEnable = rmReportSuccessEnable
}
-func (c *DefaultStateMachineConfig) GetStateMachineDefinition(name string)
*statemachine.StateMachineObject {
- return c.stateMachineDefs[name]
+func (c *DefaultStateMachineConfig) GetStateMachineConfig()
engine.StateMachineConfig {
+ return c
}
func (c *DefaultStateMachineConfig) GetExpressionFactory(expressionType
string) expr.ExpressionFactory {
@@ -364,28 +359,23 @@ func (c *DefaultStateMachineConfig) LoadConfig(configPath
string) error {
return fmt.Errorf("failed to read config file: path=%s,
error=%w", configPath, err)
}
- parser := parser.NewStateMachineConfigParser()
- smo, err := parser.Parse(content)
- if err != nil {
- return fmt.Errorf("failed to parse state machine definition:
path=%s, error=%w", configPath, err)
- }
-
var configFileParams ConfigFileParams
- if err := json.Unmarshal(content, &configFileParams); err != nil {
+ ext := strings.ToLower(filepath.Ext(configPath))
+
+ switch ext {
+ case ".json":
+ if err := json.Unmarshal(content, &configFileParams); err !=
nil {
+ return fmt.Errorf("failed to unmarshal config file as
JSON: %w", err)
+ }
+ case ".yaml", ".yml":
if err := yaml.Unmarshal(content, &configFileParams); err !=
nil {
return fmt.Errorf("failed to unmarshal config file as
YAML: %w", err)
- } else {
- c.applyConfigFileParams(&configFileParams)
}
- } else {
- c.applyConfigFileParams(&configFileParams)
- }
-
- if _, exists := c.stateMachineDefs[smo.Name]; exists {
- return fmt.Errorf("state machine definition with name %s
already exists", smo.Name)
+ default:
+ return fmt.Errorf("unsupported config file type: path=%s,
ext=%s (only .json/.yaml/.yml are supported)", configPath, ext)
}
- c.stateMachineDefs[smo.Name] = smo
+ c.applyConfigFileParams(&configFileParams)
return nil
}
@@ -517,6 +507,14 @@ func (c *DefaultStateMachineConfig) initServiceInvokers()
error {
c.RegisterServiceInvoker("func", invoker.NewFuncInvoker())
}
+ if c.scriptInvokerManager == nil {
+ c.scriptInvokerManager = invoker.NewScriptInvokerManager()
+ }
+
+ if jsInvoker, err := c.scriptInvokerManager.GetInvoker("javascript");
err != nil || jsInvoker == nil {
+
c.scriptInvokerManager.RegisterInvoker(invoker.NewJavaScriptScriptInvoker())
+ }
+
return nil
}
@@ -533,6 +531,10 @@ func (c *DefaultStateMachineConfig) Validate() error {
errs = append(errs, fmt.Errorf("service invoker manager is
nil"))
}
+ if c.scriptInvokerManager == nil {
+ errs = append(errs, fmt.Errorf("script invoker manager is nil"))
+ }
+
if c.transOperationTimeout <= 0 {
errs = append(errs, fmt.Errorf("invalid trans operation
timeout: %d", c.transOperationTimeout))
}
@@ -605,7 +607,7 @@ func (c *DefaultStateMachineConfig)
EvaluateExpression(expressionStr string, con
return result, nil
}
-func NewDefaultStateMachineConfig(opts ...Option) *DefaultStateMachineConfig {
+func NewDefaultStateMachineConfig(opts ...Option) (*DefaultStateMachineConfig,
error) {
ctx := context.Background()
defaultBP := process_ctrl.NewBusinessProcessor()
@@ -619,7 +621,6 @@ func NewDefaultStateMachineConfig(opts ...Option)
*DefaultStateMachineConfig {
sagaCompensatePersistModeUpdate:
DefaultClientSagaCompensatePersistModeUpdate,
sagaBranchRegisterEnable:
DefaultClientSagaBranchRegisterEnable,
rmReportSuccessEnable:
DefaultClientReportSuccessEnable,
- stateMachineDefs:
make(map[string]*statemachine.StateMachineObject),
componentLock: &sync.Mutex{},
seqGenerator: sequence.NewUUIDSeqGenerator(),
statusDecisionStrategy:
strategy.NewDefaultStatusDecisionStrategy(),
@@ -648,13 +649,11 @@ func NewDefaultStateMachineConfig(opts ...Option)
*DefaultStateMachineConfig {
opt(c)
}
- if err := c.LoadConfig("config.yaml"); err == nil {
- log.Printf("Successfully loaded config from config.yaml")
- } else {
- log.Printf("Failed to load config file (using default/env
values): %v", err)
+ if err := c.Init(); err != nil {
+ return nil, fmt.Errorf("failed to initialize state machine
config: %w", err)
}
- return c
+ return c, nil
}
func parseEnvResources() []string {
@@ -732,3 +731,22 @@ func WithStateMachineRepository(machineRepo
repo.StateMachineRepository) Option
c.stateMachineRepository = machineRepo
}
}
+
+func WithConfigPath(path string) Option {
+ return func(c *DefaultStateMachineConfig) {
+ if path == "" {
+ return
+ }
+ if err := c.LoadConfig(path); err != nil {
+ log.Printf("Failed to load config from %s: %v", path,
err)
+ } else {
+ log.Printf("Successfully loaded config from %s", path)
+ }
+ }
+}
+
+func WithScriptInvokerManager(scriptManager invoker.ScriptInvokerManager)
Option {
+ return func(c *DefaultStateMachineConfig) {
+ c.scriptInvokerManager = scriptManager
+ }
+}
diff --git
a/pkg/saga/statemachine/engine/config/default_statemachine_config_test.go
b/pkg/saga/statemachine/engine/config/default_statemachine_config_test.go
index f3ad80ca..342f5c3b 100644
--- a/pkg/saga/statemachine/engine/config/default_statemachine_config_test.go
+++ b/pkg/saga/statemachine/engine/config/default_statemachine_config_test.go
@@ -18,95 +18,128 @@
package config
import (
- "errors"
+ "github.com/pkg/errors"
+ "github.com/seata/seata-go/pkg/saga/statemachine/statelang"
"io"
"os"
"path/filepath"
+ "reflect"
"testing"
- "github.com/seata/seata-go/pkg/saga/statemachine/statelang"
"github.com/stretchr/testify/assert"
)
+type TestStateMachineRepositoryMock struct {
+}
+
+func (m *TestStateMachineRepositoryMock) GetStateMachineById(stateMachineId
string) (statelang.StateMachine, error) {
+ return nil, errors.New("mock repository error")
+}
+
+func (m *TestStateMachineRepositoryMock)
GetStateMachineByNameAndTenantId(stateMachineName string, tenantId string)
(statelang.StateMachine, error) {
+ return nil, errors.New("mock repository error")
+}
+
+func (m *TestStateMachineRepositoryMock)
GetLastVersionStateMachine(stateMachineName string, tenantId string)
(statelang.StateMachine, error) {
+ return nil, errors.New("mock repository error")
+}
+
+func (m *TestStateMachineRepositoryMock) RegistryStateMachine(stateMachine
statelang.StateMachine) error {
+ return errors.New("mock registration error")
+}
+
+func (m *TestStateMachineRepositoryMock) RegistryStateMachineByReader(reader
io.Reader) error {
+ return errors.New("mock registration error")
+}
+
func TestDefaultStateMachineConfig_LoadValidJSON(t *testing.T) {
os.Unsetenv("SEATA_STATE_MACHINE_RESOURCES")
- config := NewDefaultStateMachineConfig()
- testFile := filepath.Join("testdata", "order_saga.json")
-
- err := config.LoadConfig(testFile)
- assert.NoError(t, err, "Loading JSON configuration should succeed")
+ testFile := filepath.Join("testdata", "saga_config.json")
+ config, err := NewDefaultStateMachineConfig(WithConfigPath(testFile))
+ assert.NoError(t, err, "Failed to initialize config")
+ assert.NotNil(t, config, "config is nil")
- smo := config.GetStateMachineDefinition("OrderSaga")
+ smo, err :=
config.stateMachineRepository.GetStateMachineByNameAndTenantId("OrderSaga", "")
+ assert.NoError(t, err)
assert.NotNil(t, smo, "State machine definition should not be nil")
- assert.Equal(t, "CreateOrder", smo.StartState, "The start state should
be correct")
- assert.Contains(t, smo.States, "CreateOrder", "The state node should
exist")
+ assert.Equal(t, "CreateOrder", smo.StartState(), "The start state
should be correct")
+ assert.Contains(t, smo.States(), "CreateOrder", "The state node should
exist")
assert.Equal(t, 30000, config.transOperationTimeout, "The timeout
should be read correctly")
}
func TestDefaultStateMachineConfig_LoadValidYAML(t *testing.T) {
os.Unsetenv("SEATA_STATE_MACHINE_RESOURCES")
- config := NewDefaultStateMachineConfig()
- testFile := filepath.Join("testdata", "order_saga.yaml")
+ testFile := filepath.Join("testdata", "saga_config.yaml")
+ config, err := NewDefaultStateMachineConfig(WithConfigPath(testFile))
+ assert.NoError(t, err, "Failed to initialize config")
+ assert.NotNil(t, config, "config is nil")
- err := config.LoadConfig(testFile)
- assert.NoError(t, err, "Loading YAML configuration should succeed")
+ smo, err :=
config.stateMachineRepository.GetStateMachineByNameAndTenantId("OrderSaga", "")
+ assert.NoError(t, err)
+ assert.NotNil(t, smo, "State machine definition should not be nil
(YAML)")
- smo := config.GetStateMachineDefinition("OrderSaga")
- assert.NotNil(t, smo)
+ assert.Equal(t, "CreateOrder", smo.StartState(), "The start state
should be correct (YAML)")
+ assert.Contains(t, smo.States(), "CreateOrder", "The state node should
exist (YAML)")
+ assert.Equal(t, 30000, config.transOperationTimeout, "The timeout
should be read correctly (YAML)")
}
func TestLoadNonExistentFile(t *testing.T) {
os.Unsetenv("SEATA_STATE_MACHINE_RESOURCES")
- config := NewDefaultStateMachineConfig()
+ config, _ :=
NewDefaultStateMachineConfig(WithConfigPath("non_existent.json"))
err := config.LoadConfig("non_existent.json")
assert.Error(t, err, "Loading a non-existent file should report an
error")
- assert.Contains(t, err.Error(), "failed to read config file", "The
error message should contain file read failure")
+ assert.Contains(t, err.Error(), "failed to read config file")
}
func TestGetStateMachineDefinition_Exists(t *testing.T) {
os.Unsetenv("SEATA_STATE_MACHINE_RESOURCES")
- config := NewDefaultStateMachineConfig()
- _ = config.LoadConfig(filepath.Join("testdata", "order_saga.json"))
+ config, _ :=
NewDefaultStateMachineConfig(WithConfigPath(filepath.Join("testdata",
"saga_config.json")))
- smo := config.GetStateMachineDefinition("OrderSaga")
+ smo, err :=
config.stateMachineRepository.GetStateMachineByNameAndTenantId("OrderSaga", "")
+ assert.NoError(t, err)
assert.NotNil(t, smo)
- assert.Equal(t, "1.0", smo.Version, "The version number should be
correct")
+ assert.Equal(t, "1.0", smo.Version(), "The version number should be
correct")
}
func TestGetNonExistentStateMachine(t *testing.T) {
os.Unsetenv("SEATA_STATE_MACHINE_RESOURCES")
- config := NewDefaultStateMachineConfig()
- smo := config.GetStateMachineDefinition("NonExistent")
- assert.Nil(t, smo, "An unloaded state machine should return nil")
+ config, _ := NewDefaultStateMachineConfig()
+ smo, err :=
config.stateMachineRepository.GetStateMachineByNameAndTenantId("NonExistent",
"")
+ assert.Error(t, err)
+ assert.True(t, smo == nil || reflect.ValueOf(smo).IsZero(), "An
unloaded state machine should return nil/zero")
}
func TestLoadDuplicateStateMachine(t *testing.T) {
os.Unsetenv("SEATA_STATE_MACHINE_RESOURCES")
- config := NewDefaultStateMachineConfig()
- testFile := filepath.Join("testdata", "order_saga.json")
-
- err := config.LoadConfig(testFile)
+ testFile := filepath.Join("testdata", "saga_config.json")
+ config, err := NewDefaultStateMachineConfig(WithConfigPath(testFile))
assert.NoError(t, err)
err = config.LoadConfig(testFile)
- assert.Error(t, err, "Duplicate loading should trigger a name conflict")
- assert.Contains(t, err.Error(), "already exists", "The error message
should contain a conflict prompt")
+ assert.NoError(t, err, "First load failed")
+
+ err = config.LoadConfig(testFile)
+ assert.NoError(t, err, "Duplicate load of identical config should not
error")
+
+ v2File := filepath.Join("testdata", "saga_config_v2.json")
+ err = config.LoadConfig(v2File)
+ assert.NoError(t, err, "Load of different version should succeed")
}
func TestRuntimeConfig_OverrideDefaults(t *testing.T) {
os.Unsetenv("SEATA_STATE_MACHINE_RESOURCES")
- config := NewDefaultStateMachineConfig()
+ config, _ := NewDefaultStateMachineConfig()
assert.Equal(t, "UTF-8", config.charset, "The default character set
should be UTF-8")
- _ = config.LoadConfig(filepath.Join("testdata", "order_saga.json"))
+ _, _ =
NewDefaultStateMachineConfig(WithConfigPath(filepath.Join("testdata",
"order_saga.json")))
assert.Equal(t, "UTF-8", config.charset, "If the configuration does not
specify, the default value should be used")
customConfig := &ConfigFileParams{
@@ -119,10 +152,7 @@ func TestRuntimeConfig_OverrideDefaults(t *testing.T) {
func TestGetDefaultExpressionFactory(t *testing.T) {
os.Unsetenv("SEATA_STATE_MACHINE_RESOURCES")
- config := NewDefaultStateMachineConfig()
-
- err := config.Init()
- assert.NoError(t, err, "Init should not return error")
+ config, _ := NewDefaultStateMachineConfig()
factory := config.GetExpressionFactory("el")
assert.NotNil(t, factory, "The default EL factory should exist")
@@ -134,12 +164,9 @@ func TestGetDefaultExpressionFactory(t *testing.T) {
func TestGetServiceInvoker(t *testing.T) {
os.Unsetenv("SEATA_STATE_MACHINE_RESOURCES")
- config := NewDefaultStateMachineConfig()
- if err := config.Init(); err != nil {
- t.Fatalf("init config failed: %v", err)
- }
+ config, _ :=
NewDefaultStateMachineConfig(WithConfigPath(filepath.Join("testdata",
"saga_config.json")))
- invoker := config.GetServiceInvoker("local")
+ invoker, _ := config.GetServiceInvoker("local")
if invoker == nil {
t.Errorf("expected non-nil invoker, got nil")
}
@@ -148,29 +175,29 @@ func TestGetServiceInvoker(t *testing.T) {
func TestLoadConfig_InvalidJSON(t *testing.T) {
os.Unsetenv("SEATA_STATE_MACHINE_RESOURCES")
- config := NewDefaultStateMachineConfig()
+ config, _ := NewDefaultStateMachineConfig()
testFile := filepath.Join("testdata", "invalid.json")
err := config.LoadConfig(testFile)
assert.Error(t, err, "Loading an invalid JSON configuration should
report an error")
- assert.Contains(t, err.Error(), "failed to parse state machine
definition", "The error message should contain parsing failure")
+ assert.Contains(t, err.Error(), "JSON", "The error message should
indicate JSON parsing failure")
}
func TestLoadConfig_InvalidYAML(t *testing.T) {
os.Unsetenv("SEATA_STATE_MACHINE_RESOURCES")
- config := NewDefaultStateMachineConfig()
+ config, _ := NewDefaultStateMachineConfig()
testFile := filepath.Join("testdata", "invalid.yaml")
err := config.LoadConfig(testFile)
assert.Error(t, err, "Loading an invalid YAML configuration should
report an error")
- assert.Contains(t, err.Error(), "failed to parse state machine
definition", "The error message should contain parsing failure")
+ assert.Contains(t, err.Error(), "yaml", "The error message should
indicate YAML parsing failure")
}
func TestRegisterStateMachineDef_Fail(t *testing.T) {
os.Unsetenv("SEATA_STATE_MACHINE_RESOURCES")
- config := NewDefaultStateMachineConfig()
+ config, _ := NewDefaultStateMachineConfig()
invalidResource := []string{"invalid_path.json"}
err := config.RegisterStateMachineDef(invalidResource)
@@ -181,7 +208,7 @@ func TestRegisterStateMachineDef_Fail(t *testing.T) {
func TestInit_ExpressionFactoryManagerNil(t *testing.T) {
os.Unsetenv("SEATA_STATE_MACHINE_RESOURCES")
- config := NewDefaultStateMachineConfig()
+ config, _ := NewDefaultStateMachineConfig()
config.expressionFactoryManager = nil
err := config.Init()
@@ -191,7 +218,7 @@ func TestInit_ExpressionFactoryManagerNil(t *testing.T) {
func TestInit_ServiceInvokerManagerNil(t *testing.T) {
os.Unsetenv("SEATA_STATE_MACHINE_RESOURCES")
- config := NewDefaultStateMachineConfig()
+ config, _ := NewDefaultStateMachineConfig()
config.serviceInvokerManager = nil
err := config.Init()
@@ -201,7 +228,7 @@ func TestInit_ServiceInvokerManagerNil(t *testing.T) {
func TestInit_StateMachineRepositoryNil(t *testing.T) {
os.Unsetenv("SEATA_STATE_MACHINE_RESOURCES")
- config := NewDefaultStateMachineConfig()
+ config, _ := NewDefaultStateMachineConfig()
config.stateMachineRepository = nil
err := config.Init()
@@ -211,7 +238,7 @@ func TestInit_StateMachineRepositoryNil(t *testing.T) {
func TestApplyRuntimeConfig_BoundaryValues(t *testing.T) {
os.Unsetenv("SEATA_STATE_MACHINE_RESOURCES")
- config := NewDefaultStateMachineConfig()
+ config, _ := NewDefaultStateMachineConfig()
customConfig := &ConfigFileParams{
TransOperationTimeout: 1,
ServiceInvokeTimeout: 1,
@@ -230,36 +257,67 @@ func TestApplyRuntimeConfig_BoundaryValues(t *testing.T) {
assert.Equal(t, maxTimeout, config.serviceInvokeTimeout, "The maximum
service invocation timeout should be correctly applied")
}
-type TestStateMachineRepositoryMock struct{}
-
-func (m *TestStateMachineRepositoryMock) GetStateMachineById(stateMachineId
string) (statelang.StateMachine, error) {
- return nil, errors.New("get state machine by id failed")
-}
-
-func (m *TestStateMachineRepositoryMock)
GetStateMachineByNameAndTenantId(stateMachineName string, tenantId string)
(statelang.StateMachine, error) {
- return nil, errors.New("get state machine by name and tenant id failed")
-}
-
-func (m *TestStateMachineRepositoryMock)
GetLastVersionStateMachine(stateMachineName string, tenantId string)
(statelang.StateMachine, error) {
- return nil, errors.New("get last version state machine failed")
-}
-
-func (m *TestStateMachineRepositoryMock) RegistryStateMachine(machine
statelang.StateMachine) error {
- return errors.New("registry state machine failed")
-}
-
-func (m *TestStateMachineRepositoryMock) RegistryStateMachineByReader(reader
io.Reader) error {
- return errors.New("registry state machine by reader failed")
-}
-
func TestRegisterStateMachineDef_RepositoryError(t *testing.T) {
os.Unsetenv("SEATA_STATE_MACHINE_RESOURCES")
- config := NewDefaultStateMachineConfig()
- config.stateMachineRepository = &TestStateMachineRepositoryMock{}
+ config, _ :=
NewDefaultStateMachineConfig(WithStateMachineRepository(&TestStateMachineRepositoryMock{}))
resource := []string{filepath.Join("testdata", "order_saga.json")}
err := config.RegisterStateMachineDef(resource)
assert.Error(t, err, "Registration should fail when the state machine
repository reports an error")
assert.Contains(t, err.Error(), "register state machine from file
failed", "The error message should contain registration failure")
}
+
+func TestAllComponentsInitialization(t *testing.T) {
+ os.Unsetenv("SEATA_STATE_MACHINE_RESOURCES")
+
+ config, err := NewDefaultStateMachineConfig()
+ assert.NoError(t, err, "Failed to create config instance")
+ assert.NotNil(t, config, "Config instance is nil")
+
+ err = config.Init()
+ assert.NoError(t, err, "Failed to initialize config")
+
+ t.Run("ProcessController", func(t *testing.T) {
+ assert.NotNil(t, config.processController, "Process controller
is not initialized")
+ })
+
+ t.Run("EventBusAndPublisher", func(t *testing.T) {
+ assert.NotNil(t, config.syncEventBus, "Sync event bus is not
initialized")
+ assert.NotNil(t, config.asyncEventBus, "Async event bus is not
initialized")
+ assert.NotNil(t, config.syncProcessCtrlEventPublisher, "Sync
event publisher is not initialized")
+ assert.NotNil(t, config.asyncProcessCtrlEventPublisher, "Async
event publisher is not initialized")
+ })
+
+ t.Run("StoreComponents", func(t *testing.T) {
+ assert.NotNil(t, config.stateLogRepository, "State log
repository is not initialized")
+ assert.NotNil(t, config.stateLogStore, "State log store is not
initialized")
+ assert.NotNil(t, config.stateLangStore, "State language store
is not initialized")
+ assert.NotNil(t, config.stateMachineRepository, "State machine
repository is not initialized")
+ })
+
+ t.Run("ExpressionComponents", func(t *testing.T) {
+ assert.NotNil(t, config.expressionFactoryManager, "Expression
factory manager is not initialized")
+ assert.NotNil(t, config.expressionResolver, "Expression
resolver is not initialized")
+
+ elFactory :=
config.expressionFactoryManager.GetExpressionFactory("el")
+ assert.NotNil(t, elFactory, "EL expression factory is not
registered")
+ })
+
+ t.Run("InvokerComponents", func(t *testing.T) {
+ assert.NotNil(t, config.serviceInvokerManager, "Service invoker
manager is not initialized")
+ assert.NotNil(t, config.scriptInvokerManager, "Script invoker
manager is not initialized")
+
+ localInvoker :=
config.serviceInvokerManager.ServiceInvoker("local")
+ assert.NotNil(t, localInvoker, "Local service invoker is not
registered")
+ })
+
+ t.Run("OtherCoreComponents", func(t *testing.T) {
+ assert.NotNil(t, config.statusDecisionStrategy, "Status
decision strategy is not initialized")
+ assert.NotNil(t, config.seqGenerator, "Sequence generator is
not initialized")
+ assert.NotNil(t, config.componentLock, "Component lock is not
initialized")
+
+ testID := config.seqGenerator.GenerateId("test-machine",
"test-tenant")
+ assert.NotEmpty(t, testID, "Sequence generator failed to
generate ID")
+ })
+}
diff --git a/pkg/saga/statemachine/engine/config/testdata/order_saga.json
b/pkg/saga/statemachine/engine/config/testdata/order_saga.json
index 691875cc..6c4eb4c8 100644
--- a/pkg/saga/statemachine/engine/config/testdata/order_saga.json
+++ b/pkg/saga/statemachine/engine/config/testdata/order_saga.json
@@ -6,9 +6,9 @@
"States": {
"CreateOrder": {
"Type": "ServiceTask",
- "serviceType": "local",
- "serviceName": "orderService",
- "serviceMethod": "createOrder",
+ "ServiceType": "local",
+ "ServiceName": "orderService",
+ "ServiceMethod": "createOrder",
"CompensateState": "CancelOrder",
"ForCompensation": false,
"ForUpdate": false,
@@ -56,9 +56,9 @@
},
"CheckStock": {
"Type": "ServiceTask",
- "serviceType": "local",
- "serviceName": "inventoryService",
- "serviceMethod": "checkAvailability",
+ "ServiceType": "local",
+ "ServiceName": "inventoryService",
+ "ServiceMethod": "checkAvailability",
"CompensateState": "RollbackStock",
"ForCompensation": false,
"ForUpdate": false,
@@ -114,9 +114,9 @@
},
"ReserveStock": {
"Type": "ServiceTask",
- "serviceType": "local",
- "serviceName": "inventoryService",
- "serviceMethod": "reserveItems",
+ "ServiceType": "local",
+ "ServiceName": "inventoryService",
+ "ServiceMethod": "reserveItems",
"CompensateState": "RollbackStock",
"ForCompensation": false,
"ForUpdate": false,
@@ -158,9 +158,9 @@
},
"ProcessPayment": {
"Type": "ServiceTask",
- "serviceType": "local",
- "serviceName": "paymentService",
- "serviceMethod": "processPayment",
+ "ServiceType": "local",
+ "ServiceName": "paymentService",
+ "ServiceMethod": "processPayment",
"CompensateState": "RefundPayment",
"ForCompensation": false,
"ForUpdate": false,
@@ -205,9 +205,9 @@
},
"CancelOrder": {
"Type": "ServiceTask",
- "serviceType": "local",
- "serviceName": "orderService",
- "serviceMethod": "cancelOrder",
+ "ServiceType": "local",
+ "ServiceName": "orderService",
+ "ServiceMethod": "cancelOrder",
"ForCompensation": true,
"ForUpdate": true,
"Input": [
@@ -222,9 +222,9 @@
},
"RollbackStock": {
"Type": "ServiceTask",
- "serviceType": "local",
- "serviceName": "inventoryService",
- "serviceMethod": "releaseItems",
+ "ServiceType": "local",
+ "ServiceName": "inventoryService",
+ "ServiceMethod": "releaseItems",
"ForCompensation": true,
"ForUpdate": true,
"Input": [
@@ -242,9 +242,9 @@
},
"RefundPayment": {
"Type": "ServiceTask",
- "serviceType": "local",
- "serviceName": "paymentService",
- "serviceMethod": "refundPayment",
+ "ServiceType": "local",
+ "ServiceName": "paymentService",
+ "ServiceMethod": "refundPayment",
"ForCompensation": true,
"ForUpdate": true,
"Input": [
diff --git a/pkg/saga/statemachine/engine/config/testdata/order_saga.yaml
b/pkg/saga/statemachine/engine/config/testdata/order_saga.yaml
index 39439d20..6f29ec6d 100644
--- a/pkg/saga/statemachine/engine/config/testdata/order_saga.yaml
+++ b/pkg/saga/statemachine/engine/config/testdata/order_saga.yaml
@@ -19,9 +19,9 @@ trans_operation_timeout: 30000
States:
CreateOrder:
Type: "ServiceTask"
- serviceType: "local"
- serviceName: "orderService"
- serviceMethod: "createOrder"
+ ServiceType: "local"
+ ServiceName: "orderService"
+ ServiceMethod: "createOrder"
CompensateState: "CancelOrder"
ForCompensation: false
ForUpdate: false
@@ -55,9 +55,9 @@ States:
CheckStock:
Type: "ServiceTask"
- serviceType: "local"
- serviceName: "inventoryService"
- serviceMethod: "checkAvailability"
+ ServiceType: "local"
+ ServiceName: "inventoryService"
+ ServiceMethod: "checkAvailability"
CompensateState: "RollbackStock"
ForCompensation: false
ForUpdate: false
@@ -93,9 +93,9 @@ States:
ReserveStock:
Type: "ServiceTask"
- serviceType: "local"
- serviceName: "inventoryService"
- serviceMethod: "reserveItems"
+ ServiceType: "local"
+ ServiceName: "inventoryService"
+ ServiceMethod: "reserveItems"
CompensateState: "RollbackStock"
ForCompensation: false
ForUpdate: false
@@ -122,9 +122,9 @@ States:
ProcessPayment:
Type: "ServiceTask"
- serviceType: "local"
- serviceName: "paymentService"
- serviceMethod: "processPayment"
+ ServiceType: "local"
+ ServiceName: "paymentService"
+ ServiceMethod: "processPayment"
CompensateState: "RefundPayment"
ForCompensation: false
ForUpdate: false
@@ -154,9 +154,9 @@ States:
CancelOrder:
Type: "ServiceTask"
- serviceType: "local"
- serviceName: "orderService"
- serviceMethod: "cancelOrder"
+ ServiceType: "local"
+ ServiceName: "orderService"
+ ServiceMethod: "cancelOrder"
ForCompensation: true
ForUpdate: true
Input:
@@ -167,9 +167,9 @@ States:
RollbackStock:
Type: "ServiceTask"
- serviceType: "local"
- serviceName: "inventoryService"
- serviceMethod: "releaseItems"
+ ServiceType: "local"
+ ServiceName: "inventoryService"
+ ServiceMethod: "releaseItems"
ForCompensation: true
ForUpdate: true
Input:
@@ -181,9 +181,9 @@ States:
RefundPayment:
Type: "ServiceTask"
- serviceType: "local"
- serviceName: "paymentService"
- serviceMethod: "refundPayment"
+ ServiceType: "local"
+ ServiceName: "paymentService"
+ ServiceMethod: "refundPayment"
ForCompensation: true
ForUpdate: true
Input:
@@ -196,9 +196,9 @@ States:
ErrorHandler:
Type: "Fail"
ErrorCode: "ORDER_PROCESSING_ERROR"
- Message: "订单处理过程中发生不可恢复的错误。"
+ Message: "An unrecoverable error occurred during order processing."
FailState:
Type: "Fail"
ErrorCode: "ORDER_CANCELLED"
- Message: "订单已取消,触发补偿完成。"
+ Message: "The order has been cancelled and compensation actions have been
completed."
\ No newline at end of file
diff --git a/pkg/saga/statemachine/engine/config/testdata/order_saga.json
b/pkg/saga/statemachine/engine/config/testdata/order_saga_v2.json
similarity index 71%
copy from pkg/saga/statemachine/engine/config/testdata/order_saga.json
copy to pkg/saga/statemachine/engine/config/testdata/order_saga_v2.json
index 691875cc..e6a3dd07 100644
--- a/pkg/saga/statemachine/engine/config/testdata/order_saga.json
+++ b/pkg/saga/statemachine/engine/config/testdata/order_saga_v2.json
@@ -1,14 +1,45 @@
{
"Name": "OrderSaga",
- "Version": "1.0",
- "StartState": "CreateOrder",
- "trans_operation_timeout": 30000,
+ "Version": "2.0",
+ "StartState": "ValidateOrder",
+ "trans_operation_timeout": 45000,
"States": {
+ "ValidateOrder": {
+ "Type": "ServiceTask",
+ "ServiceType": "local",
+ "ServiceName": "validationService",
+ "ServiceMethod": "validateOrderParams",
+ "CompensateState": "SkipValidation",
+ "ForCompensation": false,
+ "ForUpdate": false,
+ "Catches": [
+ {
+ "Exceptions": [
+ "InvalidOrderParamsException"
+ ],
+ "Next": "ErrorHandler"
+ }
+ ],
+ "Status": {
+ "return.valid == true": "VALID",
+ "return.valid == false": "INVALID",
+ "$exception{*}": "UNKNOWN"
+ },
+ "Input": [
+ {
+ "orderInfo": "$.orderInfo"
+ }
+ ],
+ "Output": {
+ "validationResult": "$.#root"
+ },
+ "Next": "CreateOrder"
+ },
"CreateOrder": {
"Type": "ServiceTask",
- "serviceType": "local",
- "serviceName": "orderService",
- "serviceMethod": "createOrder",
+ "ServiceType": "local",
+ "ServiceName": "orderService",
+ "ServiceMethod": "createOrder",
"CompensateState": "CancelOrder",
"ForCompensation": false,
"ForUpdate": false,
@@ -18,8 +49,8 @@
"OrderCreationException",
"InventoryUnavailableException"
],
- "IntervalSeconds": 2,
- "MaxAttempts": 3,
+ "IntervalSeconds": 3,
+ "MaxAttempts": 4,
"BackoffRate": 1.5
}
],
@@ -56,9 +87,9 @@
},
"CheckStock": {
"Type": "ServiceTask",
- "serviceType": "local",
- "serviceName": "inventoryService",
- "serviceMethod": "checkAvailability",
+ "ServiceType": "local",
+ "ServiceName": "inventoryService",
+ "ServiceMethod": "checkAvailability",
"CompensateState": "RollbackStock",
"ForCompensation": false,
"ForUpdate": false,
@@ -114,9 +145,9 @@
},
"ReserveStock": {
"Type": "ServiceTask",
- "serviceType": "local",
- "serviceName": "inventoryService",
- "serviceMethod": "reserveItems",
+ "ServiceType": "local",
+ "ServiceName": "inventoryService",
+ "ServiceMethod": "reserveItems",
"CompensateState": "RollbackStock",
"ForCompensation": false,
"ForUpdate": false,
@@ -158,9 +189,9 @@
},
"ProcessPayment": {
"Type": "ServiceTask",
- "serviceType": "local",
- "serviceName": "paymentService",
- "serviceMethod": "processPayment",
+ "ServiceType": "local",
+ "ServiceName": "paymentService",
+ "ServiceMethod": "processPayment",
"CompensateState": "RefundPayment",
"ForCompensation": false,
"ForUpdate": false,
@@ -205,9 +236,9 @@
},
"CancelOrder": {
"Type": "ServiceTask",
- "serviceType": "local",
- "serviceName": "orderService",
- "serviceMethod": "cancelOrder",
+ "ServiceType": "local",
+ "ServiceName": "orderService",
+ "ServiceMethod": "cancelOrder",
"ForCompensation": true,
"ForUpdate": true,
"Input": [
@@ -222,9 +253,9 @@
},
"RollbackStock": {
"Type": "ServiceTask",
- "serviceType": "local",
- "serviceName": "inventoryService",
- "serviceMethod": "releaseItems",
+ "ServiceType": "local",
+ "ServiceName": "inventoryService",
+ "ServiceMethod": "releaseItems",
"ForCompensation": true,
"ForUpdate": true,
"Input": [
@@ -242,9 +273,9 @@
},
"RefundPayment": {
"Type": "ServiceTask",
- "serviceType": "local",
- "serviceName": "paymentService",
- "serviceMethod": "refundPayment",
+ "ServiceType": "local",
+ "ServiceName": "paymentService",
+ "ServiceMethod": "refundPayment",
"ForCompensation": true,
"ForUpdate": true,
"Input": [
@@ -262,13 +293,30 @@
},
"ErrorHandler": {
"Type": "Fail",
- "ErrorCode": "ORDER_PROCESSING_ERROR",
- "Message": "An unrecoverable error occurred during order processing."
+ "ErrorCode": "ORDER_PROCESSING_ERROR_V2",
+ "Message": "v2版本:订单处理发生不可恢复错误"
},
"FailState": {
"Type": "Fail",
"ErrorCode": "ORDER_CANCELLED",
"Message": "The order has been cancelled and compensation actions have
been completed."
+ },
+ "SkipValidation": {
+ "Type": "ServiceTask",
+ "ServiceType": "local",
+ "ServiceName": "validationService",
+ "ServiceMethod": "skipValidation",
+ "ForCompensation": true,
+ "ForUpdate": true,
+ "Input": [
+ {
+ "orderId": "$.orderId"
+ }
+ ],
+ "Output": {
+ "skipResult": "Validation compensation skipped"
+ },
+ "Next": "CancelOrder"
}
}
}
\ No newline at end of file
diff --git a/pkg/saga/statemachine/engine/config/testdata/order_saga_v2.yaml
b/pkg/saga/statemachine/engine/config/testdata/order_saga_v2.yaml
new file mode 100644
index 00000000..f2570d67
--- /dev/null
+++ b/pkg/saga/statemachine/engine/config/testdata/order_saga_v2.yaml
@@ -0,0 +1,213 @@
+Name: OrderSaga
+Version: "2.0"
+StartState: ValidateOrder
+trans_operation_timeout: 45000
+States:
+ ValidateOrder:
+ Type: ServiceTask
+ ServiceType: local
+ ServiceName: validationService
+ ServiceMethod: validateOrderParams
+ CompensateState: SkipValidation
+ ForCompensation: false
+ ForUpdate: false
+ Catches:
+ - Exceptions:
+ - InvalidOrderParamsException
+ Next: ErrorHandler
+ Status:
+ return.valid == true: VALID
+ return.valid == false: INVALID
+ $exception{*}: UNKNOWN
+ Input:
+ - orderInfo: $.orderInfo
+ Output:
+ validationResult: "$.#root"
+ Next: CreateOrder
+ CreateOrder:
+ Type: ServiceTask
+ ServiceType: local
+ ServiceName: orderService
+ ServiceMethod: createOrder
+ CompensateState: CancelOrder
+ ForCompensation: false
+ ForUpdate: false
+ Retry:
+ - Exceptions:
+ - OrderCreationException
+ - InventoryUnavailableException
+ IntervalSeconds: 3
+ MaxAttempts: 4
+ BackoffRate: 1.5
+ Catches:
+ - Exceptions:
+ - OrderCreationException
+ - InventoryUnavailableException
+ Next: ErrorHandler
+ Status:
+ return.code == 'SUCCESS': SUCCEEDED
+ return.code == 'FAIL': FAILED
+ $exception{*}: UNKNOWN
+ Input:
+ - orderInfo: $.orderInfo
+ Output:
+ orderId: "$.#root"
+ Next: CheckStock
+ Loop:
+ Parallel: 1
+ Collection: $.orderItems
+ ElementVariableName: item
+ ElementIndexName: index
+ CompletionCondition: "[nrOfInstances] == [nrOfCompletedInstances]"
+ CheckStock:
+ Type: ServiceTask
+ ServiceType: local
+ ServiceName: inventoryService
+ ServiceMethod: checkAvailability
+ CompensateState: RollbackStock
+ ForCompensation: false
+ ForUpdate: false
+ Retry:
+ - Exceptions:
+ - StockCheckException
+ IntervalSeconds: 2
+ MaxAttempts: 2
+ BackoffRate: 1.2
+ Catches:
+ - Exceptions:
+ - StockCheckException
+ Next: ErrorHandler
+ Status:
+ return.available == true: IN_STOCK
+ return.available == false: OUT_OF_STOCK
+ $exception{*}: UNKNOWN
+ Input:
+ - orderId: $.orderId
+ - itemsList: $.orderItems
+ Output:
+ stockAvailable: "$.#root"
+ Next: DecideStock
+ DecideStock:
+ Type: Choice
+ Choices:
+ - Expression: stockAvailable == true
+ Next: ReserveStock
+ - Expression: stockAvailable == false
+ Next: CancelOrder
+ Default: ErrorHandler
+ ReserveStock:
+ Type: ServiceTask
+ ServiceType: local
+ ServiceName: inventoryService
+ ServiceMethod: reserveItems
+ CompensateState: RollbackStock
+ ForCompensation: false
+ ForUpdate: false
+ Retry:
+ - Exceptions:
+ - StockReservationException
+ IntervalSeconds: 2
+ MaxAttempts: 2
+ BackoffRate: 1.2
+ Catches:
+ - Exceptions:
+ - StockReservationException
+ Next: ErrorHandler
+ Status:
+ return.code == 'RESERVED': STOCK_RESERVED
+ return.code == 'FAILED': FAILED
+ $exception{*}: UNKNOWN
+ Input:
+ - orderId: $.orderId
+ - itemList: $.orderItems
+ Output:
+ stockReservationId: "$.#root"
+ Next: ProcessPayment
+ ProcessPayment:
+ Type: ServiceTask
+ ServiceType: local
+ ServiceName: paymentService
+ ServiceMethod: processPayment
+ CompensateState: RefundPayment
+ ForCompensation: false
+ ForUpdate: false
+ Retry:
+ - Exceptions:
+ - PaymentProcessingException
+ IntervalSeconds: 3
+ MaxAttempts: 3
+ BackoffRate: 1.5
+ Catches:
+ - Exceptions:
+ - PaymentProcessingException
+ Next: ErrorHandler
+ Status:
+ return.code == 'PAID': PAYMENT_SUCCESS
+ return.code == 'DECLINED': PAYMENT_FAILED
+ $exception{*}: UNKNOWN
+ Input:
+ - orderId: $.orderId
+ - amount: $.orderTotal
+ Output:
+ paymentTransactionId: "$.#root"
+ Next: CompleteOrder
+ CompleteOrder:
+ Type: Succeed
+ CancelOrder:
+ Type: ServiceTask
+ ServiceType: local
+ ServiceName: orderService
+ ServiceMethod: cancelOrder
+ ForCompensation: true
+ ForUpdate: true
+ Input:
+ - orderId: $.orderId
+ Output:
+ cancelResult: "$.#root"
+ Next: RollbackStock
+ RollbackStock:
+ Type: ServiceTask
+ ServiceType: local
+ ServiceName: inventoryService
+ ServiceMethod: releaseItems
+ ForCompensation: true
+ ForUpdate: true
+ Input:
+ - orderId: $.orderId
+ - stockReservationId: $.stockReservationId
+ Output:
+ rollbackResult: "$.#root"
+ Next: RefundPayment
+ RefundPayment:
+ Type: ServiceTask
+ ServiceType: local
+ ServiceName: paymentService
+ ServiceMethod: refundPayment
+ ForCompensation: true
+ ForUpdate: true
+ Input:
+ - orderId: $.orderId
+ - paymentTransactionId: $.paymentTransactionId
+ Output:
+ refundResult: "$.#root"
+ Next: FailState
+ ErrorHandler:
+ Type: Fail
+ ErrorCode: ORDER_PROCESSING_ERROR_V2
+ Message: v2版本:订单处理发生不可恢复错误
+ FailState:
+ Type: Fail
+ ErrorCode: ORDER_CANCELLED
+ Message: The order has been cancelled and compensation actions have been
completed.
+ SkipValidation:
+ Type: ServiceTask
+ ServiceType: local
+ ServiceName: validationService
+ ServiceMethod: skipValidation
+ ForCompensation: true
+ ForUpdate: true
+ Input:
+ - orderId: $.orderId
+ Output:
+ skipResult: Validation compensation skipped
+ Next: CancelOrder
diff --git a/pkg/saga/statemachine/engine/config/testdata/saga_config.json
b/pkg/saga/statemachine/engine/config/testdata/saga_config.json
new file mode 100644
index 00000000..e87a3eb6
--- /dev/null
+++ b/pkg/saga/statemachine/engine/config/testdata/saga_config.json
@@ -0,0 +1,13 @@
+{
+ "trans_operation_timeout": 30000,
+ "service_invoke_timeout": 30000,
+ "charset": "UTF-8",
+ "default_tenant_id": "tenant_1",
+ "saga_retry_persist_mode_update": true,
+ "saga_compensate_persist_mode_update": true,
+ "saga_branch_register_enable": true,
+ "rm_report_success_enable": true,
+ "state_machine_resources": [
+ "testdata/order_saga.json"
+ ]
+}
diff --git a/pkg/saga/statemachine/engine/config/testdata/saga_config.yaml
b/pkg/saga/statemachine/engine/config/testdata/saga_config.yaml
new file mode 100644
index 00000000..a11ed4f6
--- /dev/null
+++ b/pkg/saga/statemachine/engine/config/testdata/saga_config.yaml
@@ -0,0 +1,25 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+trans_operation_timeout: 30000
+service_invoke_timeout: 30000
+charset: "UTF-8"
+default_tenant_id: "tenant_1"
+saga_retry_persist_mode_update: true
+saga_compensate_persist_mode_update: true
+saga_branch_register_enable: true
+rm_report_success_enable: true
+state_machine_resources:
+ - "testdata/order_saga.json"
\ No newline at end of file
diff --git a/pkg/saga/statemachine/engine/config/testdata/saga_config_v2.json
b/pkg/saga/statemachine/engine/config/testdata/saga_config_v2.json
new file mode 100644
index 00000000..5a5b2dd5
--- /dev/null
+++ b/pkg/saga/statemachine/engine/config/testdata/saga_config_v2.json
@@ -0,0 +1,13 @@
+{
+ "trans_operation_timeout": 30000,
+ "service_invoke_timeout": 30000,
+ "charset": "UTF-8",
+ "default_tenant_id": "tenant_1",
+ "saga_retry_persist_mode_update": true,
+ "saga_compensate_persist_mode_update": true,
+ "saga_branch_register_enable": true,
+ "rm_report_success_enable": true,
+ "state_machine_resources": [
+ "testdata/order_saga_v2.json"
+ ]
+}
\ No newline at end of file
diff --git a/pkg/saga/statemachine/engine/config/testdata/saga_config_v2.yaml
b/pkg/saga/statemachine/engine/config/testdata/saga_config_v2.yaml
new file mode 100644
index 00000000..681c4bfe
--- /dev/null
+++ b/pkg/saga/statemachine/engine/config/testdata/saga_config_v2.yaml
@@ -0,0 +1,10 @@
+trans_operation_timeout: 30000
+service_invoke_timeout: 30000
+charset: UTF-8
+default_tenant_id: tenant_1
+saga_retry_persist_mode_update: true
+saga_compensate_persist_mode_update: true
+saga_branch_register_enable: true
+rm_report_success_enable: true
+state_machine_resources:
+ - testdata/order_saga_v2.json
diff --git
a/pkg/saga/statemachine/engine/core/process_ctrl_statemachine_engine.go
b/pkg/saga/statemachine/engine/core/process_ctrl_statemachine_engine.go
index cc51b552..432e1a59 100644
--- a/pkg/saga/statemachine/engine/core/process_ctrl_statemachine_engine.go
+++ b/pkg/saga/statemachine/engine/core/process_ctrl_statemachine_engine.go
@@ -40,10 +40,16 @@ type ProcessCtrlStateMachineEngine struct {
StateMachineConfig engine.StateMachineConfig
}
-func NewProcessCtrlStateMachineEngine() *ProcessCtrlStateMachineEngine {
- return &ProcessCtrlStateMachineEngine{
- StateMachineConfig: config.NewDefaultStateMachineConfig(),
+func NewProcessCtrlStateMachineEngine() (*ProcessCtrlStateMachineEngine,
error) {
+ cfg, err := config.NewDefaultStateMachineConfig()
+
+ if err != nil {
+ return nil, fmt.Errorf("failed to create state machine
configuration: %w", err)
}
+
+ return &ProcessCtrlStateMachineEngine{
+ StateMachineConfig: cfg,
+ }, nil
}
func (p ProcessCtrlStateMachineEngine) Start(ctx context.Context,
stateMachineName string, tenantId string,
diff --git a/pkg/saga/statemachine/engine/expr/expression_factory_manager.go
b/pkg/saga/statemachine/engine/expr/expression_factory_manager.go
index 82e3f04d..17ad4408 100644
--- a/pkg/saga/statemachine/engine/expr/expression_factory_manager.go
+++ b/pkg/saga/statemachine/engine/expr/expression_factory_manager.go
@@ -18,7 +18,6 @@
package expr
import (
- "maps"
"strings"
)
@@ -42,7 +41,9 @@ func (e *ExpressionFactoryManager)
GetExpressionFactory(expressionType string) E
}
func (e *ExpressionFactoryManager)
SetExpressionFactoryMap(expressionFactoryMap map[string]ExpressionFactory) {
- maps.Copy(e.expressionFactoryMap, expressionFactoryMap)
+ for k, v := range expressionFactoryMap {
+ e.expressionFactoryMap[k] = v
+ }
}
func (e *ExpressionFactoryManager) PutExpressionFactory(expressionType string,
factory ExpressionFactory) {
diff --git
a/pkg/saga/statemachine/engine/repo/repository/state_machine_repository.go
b/pkg/saga/statemachine/engine/repo/repository/state_machine_repository.go
index 5f535a31..8fd3260e 100644
--- a/pkg/saga/statemachine/engine/repo/repository/state_machine_repository.go
+++ b/pkg/saga/statemachine/engine/repo/repository/state_machine_repository.go
@@ -18,6 +18,7 @@
package repository
import (
+ "fmt"
"io"
"sync"
"time"
@@ -102,7 +103,14 @@ func (s *StateMachineRepositoryImpl)
GetStateMachineById(stateMachineId string)
}
func (s *StateMachineRepositoryImpl)
GetStateMachineByNameAndTenantId(stateMachineName string, tenantId string)
(statelang.StateMachine, error) {
- return s.GetLastVersionStateMachine(stateMachineName, tenantId)
+ sm, err := s.GetLastVersionStateMachine(stateMachineName, tenantId)
+ if err != nil {
+ return nil, err
+ }
+ if sm == nil {
+ return nil, fmt.Errorf("state machine %s (tenant %s) not
found", stateMachineName, tenantId)
+ }
+ return sm, nil
}
func (s *StateMachineRepositoryImpl)
GetLastVersionStateMachine(stateMachineName string, tenantId string)
(statelang.StateMachine, error) {
diff --git a/pkg/saga/statemachine/engine/statemachine_config.go
b/pkg/saga/statemachine/engine/statemachine_config.go
index 04db45cd..e6d8d5f0 100644
--- a/pkg/saga/statemachine/engine/statemachine_config.go
+++ b/pkg/saga/statemachine/engine/statemachine_config.go
@@ -18,7 +18,6 @@
package engine
import (
- "github.com/seata/seata-go/pkg/saga/statemachine"
"github.com/seata/seata-go/pkg/saga/statemachine/engine/expr"
"github.com/seata/seata-go/pkg/saga/statemachine/engine/invoker"
"github.com/seata/seata-go/pkg/saga/statemachine/engine/repo"
@@ -69,9 +68,7 @@ type StateMachineConfig interface {
RegisterServiceInvoker(serviceType string, invoker
invoker.ServiceInvoker)
- GetStateMachineDefinition(name string) *statemachine.StateMachineObject
-
GetExpressionFactory(expressionType string) expr.ExpressionFactory
- GetServiceInvoker(serviceType string) invoker.ServiceInvoker
+ GetServiceInvoker(serviceType string) (invoker.ServiceInvoker, error)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]