This is an automated email from the ASF dual-hosted git repository.
zfeng pushed a commit to branch feature/saga
in repository https://gitbox.apache.org/repos/asf/incubator-seata-go.git
The following commit(s) were added to refs/heads/feature/saga by this push:
new c2dfcf0e Feature: finish implementation for StateMachineConfig (#805)
c2dfcf0e is described below
commit c2dfcf0e388552e43839605b0f8419401794788f
Author: flypiggy <[email protected]>
AuthorDate: Thu Jun 19 10:07:11 2025 +0800
Feature: finish implementation for StateMachineConfig (#805)
* finish implementation for StateMachineConfig
* reuse the exiting logic from statemachine_config_parser.go
* add_license&change_cn_to_en&fix_logic
* sort imports & rename struct & change the visbility
* Merge feature/saga into feature/saga-config-selection-and-loading
* fix_conflict_from_expression-pr
* fix_test&Lock-logic_from_invoker.go
* change_method_name
* add-seqGenerator_init
* add-event_bus&event_publiser-init
* add-status_desicion-init & enhence-validate_method
* refactor: move error_expression-about to expr
* fix: stateMachineResources-load
* fix - test
* Optimize the judgment of persistence-related components.
* Optimize the logic of loadconfig and statemachineresource.
* add-store-init
* refactor-engine/core
* Decouple the core package
* Update default_statemachine_config_test.go
---------
Co-authored-by: FengZhang <[email protected]>
---
go.mod | 20 +-
go.sum | 23 +-
.../engine/config/default_statemachine_config.go | 485 ++++++++++++++++++++-
.../config/default_statemachine_config_test.go | 230 ++++++++++
.../statemachine/engine/core/testdata/invalid.json | 3 +
.../core/testdata/invalid.json.comment} | 37 +-
.../statemachine/engine/core/testdata/invalid.yaml | 19 +
.../engine/core/testdata/order_saga.json | 274 ++++++++++++
.../core/testdata/order_saga.json.comment} | 37 +-
.../engine/core/testdata/order_saga.yaml | 204 +++++++++
.../statemachine/engine/expr/error_expression.go | 21 +
.../engine/expr/sequence_expression.go | 2 +-
.../engine/expr/sequence_expression_factory.go | 39 ++
pkg/saga/statemachine/engine/invoker/invoker.go | 160 ++++++-
.../engine/repo/repository/state_log_repository.go | 6 +-
.../statemachine/engine/repo/statemachine_store.go | 2 +-
.../statemachine/engine/statemachine_config.go | 21 +-
pkg/saga/statemachine/process_ctrl/event_bus.go | 16 +
.../statemachine/process_ctrl/event_consumer.go | 5 +-
19 files changed, 1483 insertions(+), 121 deletions(-)
diff --git a/go.mod b/go.mod
index feb9bfb8..f886c5ef 100644
--- a/go.mod
+++ b/go.mod
@@ -49,7 +49,7 @@ require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/bits-and-blooms/bitset v1.2.0 // indirect
github.com/bytedance/sonic v1.9.1 // indirect
- github.com/cespare/xxhash/v2 v2.3.0 // indirect
+ github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 //
indirect
github.com/creasty/defaults v1.5.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
@@ -58,7 +58,7 @@ require (
github.com/go-ole/go-ole v1.2.6 // indirect
github.com/go-playground/locales v0.14.1 // indirect
github.com/go-playground/universal-translator v0.18.1 // indirect
- github.com/golang/protobuf v1.5.4 // indirect
+ github.com/golang/protobuf v1.5.3 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/gorilla/websocket v1.4.2 // indirect
github.com/jinzhu/copier v0.3.5 // indirect
@@ -79,11 +79,10 @@ require (
github.com/pingcap/errors v0.11.5-0.20210425183316-da1aaba5fb63 //
indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c //
indirect
- github.com/prometheus/client_model v0.3.0 // indirect
+ github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/procfs v0.7.3 // indirect
github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b //
indirect
github.com/shirou/gopsutil/v3 v3.22.2 // indirect
- github.com/stoewer/go-strcase v1.2.0 // indirect
github.com/tklauser/go-sysconf v0.3.10 // indirect
github.com/tklauser/numcpus v0.4.0 // indirect
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
@@ -91,10 +90,7 @@ require (
github.com/yusufpapurcu/wmi v1.2.2 // indirect
go.uber.org/multierr v1.8.0 // indirect
golang.org/x/arch v0.3.0 // indirect
- golang.org/x/exp v0.0.0-20220827204233-334a2380cb91 // indirect
golang.org/x/text v0.14.0 // indirect
- google.golang.org/genproto/googleapis/api
v0.0.0-20230803162519-f966b187b2e5 // indirect
- google.golang.org/genproto/googleapis/rpc
v0.0.0-20230803162519-f966b187b2e5 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
)
@@ -106,9 +102,13 @@ require (
github.com/mattn/go-isatty v0.0.19 // indirect
github.com/pelletier/go-toml v1.9.3 // indirect
github.com/pingcap/log v0.0.0-20210906054005-afc726e70354 // indirect
- golang.org/x/crypto v0.19.0 // indirect
- golang.org/x/net v0.21.0 // indirect
- golang.org/x/sys v0.17.0 // indirect
+ github.com/stoewer/go-strcase v1.2.0 // indirect
+ golang.org/x/crypto v0.17.0 // indirect
+ golang.org/x/exp v0.0.0-20220827204233-334a2380cb91 // indirect
+ golang.org/x/net v0.10.0 // indirect
+ golang.org/x/sys v0.15.0 // indirect
+ google.golang.org/genproto/googleapis/api
v0.0.0-20230803162519-f966b187b2e5 // indirect
+ google.golang.org/genproto/googleapis/rpc
v0.0.0-20230803162519-f966b187b2e5 // indirect
vimagination.zapto.org/memio v0.0.0-20200222190306-588ebc67b97d //
indirect
)
diff --git a/go.sum b/go.sum
index fae6c71b..68d4931f 100644
--- a/go.sum
+++ b/go.sum
@@ -122,8 +122,8 @@ github.com/certifi/gocertifi
v0.0.0-20191021191039-0944d244cd40/go.mod h1:sGbDF6
github.com/cespare/xxhash v1.1.0/go.mod
h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
github.com/cespare/xxhash/v2 v2.1.1/go.mod
h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cespare/xxhash/v2 v2.1.2/go.mod
h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
-github.com/cespare/xxhash/v2 v2.3.0
h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
-github.com/cespare/xxhash/v2 v2.3.0/go.mod
h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
+github.com/cespare/xxhash/v2 v2.2.0
h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
+github.com/cespare/xxhash/v2 v2.2.0/go.mod
h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/chenzhuoyu/base64x v0.0.0-20211019084208-fb5309c8db06/go.mod
h1:DH46F32mSOjUmXrMHnKwZdA8wcEefY7UVqBKYGjpdQY=
github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311
h1:qSGYFH7+jGhDF8vLC+iwCD4WpbV1EBDSzWkJODFLams=
github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311/go.mod
h1:b583jCggY9gE99b6G5LEC39OIiVsWj+R97kbl5odCEk=
@@ -308,8 +308,8 @@ github.com/golang/protobuf v1.4.2/go.mod
h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw
github.com/golang/protobuf v1.4.3/go.mod
h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
github.com/golang/protobuf v1.5.0/go.mod
h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.2/go.mod
h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
-github.com/golang/protobuf v1.5.4
h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
-github.com/golang/protobuf v1.5.4/go.mod
h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
+github.com/golang/protobuf v1.5.3
h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
+github.com/golang/protobuf v1.5.3/go.mod
h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod
h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.1/go.mod
h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
@@ -643,9 +643,8 @@ github.com/prometheus/client_model
v0.0.0-20190115171406-56726106282f/go.mod h1:
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod
h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod
h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/client_model v0.1.0/go.mod
h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
+github.com/prometheus/client_model v0.2.0
h1:uq5h0d+GuxiXLJLNABMgp2qUWDPiLvgCzz2dUR+/W/M=
github.com/prometheus/client_model v0.2.0/go.mod
h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
-github.com/prometheus/client_model v0.3.0
h1:UBgGFHqYdG/TPFD1B1ogZywDqEkwp3fBMvqdiQ7Xew4=
-github.com/prometheus/client_model v0.3.0/go.mod
h1:LDGWKZIo7rky3hgvBe+caln+Dr3dPggB5dvjtD7w9+w=
github.com/prometheus/common v0.0.0-20181113130724-41aa239b4cce/go.mod
h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro=
github.com/prometheus/common v0.2.0/go.mod
h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4=
github.com/prometheus/common v0.4.0/go.mod
h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4=
@@ -840,8 +839,8 @@ golang.org/x/crypto
v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPh
golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97/go.mod
h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod
h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.0.0-20211215153901-e495a2d5b3d3/go.mod
h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
-golang.org/x/crypto v0.19.0 h1:ENy+Az/9Y1vSrlrvBSyna3PITt4tiZLf7sgCjZBX7Wo=
-golang.org/x/crypto v0.19.0/go.mod
h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU=
+golang.org/x/crypto v0.17.0 h1:r8bRNjWL3GshPW3gkd+RpvzWrZAwPS49OmTGZ/uhM4k=
+golang.org/x/crypto v0.17.0/go.mod
h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4=
golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod
h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod
h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20181106170214-d68db9428509/go.mod
h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
@@ -932,8 +931,8 @@ golang.org/x/net v0.0.0-20211029224645-99673261e6eb/go.mod
h1:9nx3DQGgdP8bBQD5qx
golang.org/x/net v0.0.0-20211105192438-b53810dc28af/go.mod
h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod
h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod
h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
-golang.org/x/net v0.21.0 h1:AQyQV4dYCvJ7vGmJyKki9+PBdyvhkSd8EIx/qb0AYv4=
-golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44=
+golang.org/x/net v0.10.0 h1:X2//UzNDwYmtCLn7To6G58Wr6f5ahEAQgKNzv9Y951M=
+golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod
h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod
h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod
h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
@@ -1032,8 +1031,8 @@ golang.org/x/sys
v0.0.0-20220704084225-05e143d24a9e/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod
h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod
h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
-golang.org/x/sys v0.17.0 h1:25cE3gD+tdBA7lp7QfhuV+rJiE9YXTcS3VG1SqssI/Y=
-golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
+golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc=
+golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod
h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod
h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod
h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
diff --git a/pkg/saga/statemachine/engine/config/default_statemachine_config.go
b/pkg/saga/statemachine/engine/config/default_statemachine_config.go
index 156448c7..78943e74 100644
--- a/pkg/saga/statemachine/engine/config/default_statemachine_config.go
+++ b/pkg/saga/statemachine/engine/config/default_statemachine_config.go
@@ -18,6 +18,18 @@
package config
import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "log"
+ "os"
+ "path/filepath"
+ "strings"
+ "sync"
+
+ "gopkg.in/yaml.v3"
+
+ "github.com/seata/seata-go/pkg/saga/statemachine"
"github.com/seata/seata-go/pkg/saga/statemachine/engine"
"github.com/seata/seata-go/pkg/saga/statemachine/engine/expr"
"github.com/seata/seata-go/pkg/saga/statemachine/engine/invoker"
@@ -26,6 +38,7 @@ import (
"github.com/seata/seata-go/pkg/saga/statemachine/process_ctrl"
"github.com/seata/seata-go/pkg/saga/statemachine/store"
"sync"
+ "github.com/seata/seata-go/pkg/saga/statemachine/statelang/parser"
)
const (
@@ -47,8 +60,17 @@ type DefaultStateMachineConfig struct {
sagaCompensatePersistModeUpdate bool
sagaBranchRegisterEnable bool
rmReportSuccessEnable bool
+ stateMachineResources []string
+
+ // State machine definitions
+ stateMachineDefs map[string]*statemachine.StateMachineObject
// Components
+ processController ProcessController
+
+ // Event Bus
+ syncEventBus EventBus
+ asyncEventBus EventBus
// Event publisher
syncProcessCtrlEventPublisher process_ctrl.EventPublisher
@@ -61,7 +83,7 @@ type DefaultStateMachineConfig struct {
stateMachineRepository repo.StateMachineRepository
// Expression related components
- expressionFactoryManager expr.ExpressionFactoryManager
+ expressionFactoryManager *expr.ExpressionFactoryManager
expressionResolver expr.ExpressionResolver
// Invoker related components
@@ -98,7 +120,15 @@ func (c *DefaultStateMachineConfig)
SetDefaultTenantId(defaultTenantId string) {
c.defaultTenantId = defaultTenantId
}
-func (c *DefaultStateMachineConfig)
SetSyncProcessCtrlEventPublisher(syncProcessCtrlEventPublisher
process_ctrl.EventPublisher) {
+func (c *DefaultStateMachineConfig) SetSyncEventBus(syncEventBus EventBus) {
+ c.syncEventBus = syncEventBus
+}
+
+func (c *DefaultStateMachineConfig) SetAsyncEventBus(asyncEventBus EventBus) {
+ c.asyncEventBus = asyncEventBus
+}
+
+func (c *DefaultStateMachineConfig)
SetSyncProcessCtrlEventPublisher(syncProcessCtrlEventPublisher EventPublisher) {
c.syncProcessCtrlEventPublisher = syncProcessCtrlEventPublisher
}
@@ -122,7 +152,7 @@ func (c *DefaultStateMachineConfig)
SetStateMachineRepository(stateMachineReposi
c.stateMachineRepository = stateMachineRepository
}
-func (c *DefaultStateMachineConfig)
SetExpressionFactoryManager(expressionFactoryManager
expr.ExpressionFactoryManager) {
+func (c *DefaultStateMachineConfig)
SetExpressionFactoryManager(expressionFactoryManager
*expr.ExpressionFactoryManager) {
c.expressionFactoryManager = expressionFactoryManager
}
@@ -162,7 +192,7 @@ func (c *DefaultStateMachineConfig) StateLangStore()
store.StateLangStore {
return c.stateLangStore
}
-func (c *DefaultStateMachineConfig) ExpressionFactoryManager()
expr.ExpressionFactoryManager {
+func (c *DefaultStateMachineConfig) ExpressionFactoryManager()
*expr.ExpressionFactoryManager {
return c.expressionFactoryManager
}
@@ -178,7 +208,15 @@ func (c *DefaultStateMachineConfig)
StatusDecisionStrategy() engine.StatusDecisi
return c.statusDecisionStrategy
}
-func (c *DefaultStateMachineConfig) EventPublisher()
process_ctrl.EventPublisher {
+func (c *DefaultStateMachineConfig) SyncEventBus() EventBus {
+ return c.syncEventBus
+}
+
+func (c *DefaultStateMachineConfig) AsyncEventBus() EventBus {
+ return c.asyncEventBus
+}
+
+func (c *DefaultStateMachineConfig) EventPublisher() EventPublisher {
return c.syncProcessCtrlEventPublisher
}
@@ -202,15 +240,15 @@ func (c *DefaultStateMachineConfig) SetCharSet(charset
string) {
c.charset = charset
}
-func (c *DefaultStateMachineConfig) DefaultTenantId() string {
+func (c *DefaultStateMachineConfig) GetDefaultTenantId() string {
return c.defaultTenantId
}
-func (c *DefaultStateMachineConfig) TransOperationTimeout() int {
+func (c *DefaultStateMachineConfig) GetTransOperationTimeout() int {
return c.transOperationTimeout
}
-func (c *DefaultStateMachineConfig) ServiceInvokeTimeout() int {
+func (c *DefaultStateMachineConfig) GetServiceInvokeTimeout() int {
return c.serviceInvokeTimeout
}
@@ -246,19 +284,438 @@ func (c *DefaultStateMachineConfig)
SetRmReportSuccessEnable(rmReportSuccessEnab
c.rmReportSuccessEnable = rmReportSuccessEnable
}
-func NewDefaultStateMachineConfig() *DefaultStateMachineConfig {
+func (c *DefaultStateMachineConfig) GetStateMachineDefinition(name string)
*statemachine.StateMachineObject {
+ return c.stateMachineDefs[name]
+}
+
+func (c *DefaultStateMachineConfig) GetExpressionFactory(expressionType
string) expr.ExpressionFactory {
+ return c.expressionFactoryManager.GetExpressionFactory(expressionType)
+}
+
+func (c *DefaultStateMachineConfig) GetServiceInvoker(serviceType string)
invoker.ServiceInvoker {
+ return c.serviceInvokerManager.ServiceInvoker(serviceType)
+}
+
+func (c *DefaultStateMachineConfig) RegisterStateMachineDef(resources
[]string) error {
+ var allFiles []string
+
+ for _, pattern := range resources {
+ matches, err := filepath.Glob(pattern)
+ if err != nil {
+ return fmt.Errorf("failed to expand glob pattern:
pattern=%s, err=%w", pattern, err)
+ }
+ if len(matches) == 0 {
+ return fmt.Errorf("open resource file failed:
pattern=%s", pattern)
+ }
+ allFiles = append(allFiles, matches...)
+ }
+
+ for _, realPath := range allFiles {
+ file, err := os.Open(realPath)
+ if err != nil {
+ return fmt.Errorf("open resource file failed: path=%s,
err=%w", realPath, err)
+ }
+ defer file.Close()
+
+ if err :=
c.stateMachineRepository.RegistryStateMachineByReader(file); err != nil {
+ return fmt.Errorf("register state machine from file
failed: path=%s, err=%w", realPath, err)
+ }
+ }
+
+ return nil
+}
+
+func (c *DefaultStateMachineConfig) RegisterExpressionFactory(expressionType
string, factory expr.ExpressionFactory) {
+ c.expressionFactoryManager.PutExpressionFactory(expressionType, factory)
+}
+
+func (c *DefaultStateMachineConfig) RegisterServiceInvoker(serviceType string,
invoker invoker.ServiceInvoker) {
+ c.serviceInvokerManager.PutServiceInvoker(serviceType, invoker)
+}
+
+type ConfigFileParams struct {
+ TransOperationTimeout int
`json:"trans_operation_timeout" yaml:"trans_operation_timeout"`
+ ServiceInvokeTimeout int `json:"service_invoke_timeout"
yaml:"service_invoke_timeout"`
+ Charset string `json:"charset" yaml:"charset"`
+ DefaultTenantId string `json:"default_tenant_id"
yaml:"default_tenant_id"`
+ SagaRetryPersistModeUpdate bool
`json:"saga_retry_persist_mode_update" yaml:"saga_retry_persist_mode_update"`
+ SagaCompensatePersistModeUpdate bool
`json:"saga_compensate_persist_mode_update"
yaml:"saga_compensate_persist_mode_update"`
+ SagaBranchRegisterEnable bool
`json:"saga_branch_register_enable" yaml:"saga_branch_register_enable"`
+ RmReportSuccessEnable bool
`json:"rm_report_success_enable" yaml:"rm_report_success_enable"`
+ StateMachineResources []string
`json:"state_machine_resources" yaml:"state_machine_resources"`
+}
+
+func (c *DefaultStateMachineConfig) LoadConfig(configPath string) error {
+ if c.seqGenerator == nil {
+ c.seqGenerator = sequence.NewUUIDSeqGenerator()
+ }
+
+ content, err := os.ReadFile(configPath)
+ if err != nil {
+ return fmt.Errorf("failed to read config file: path=%s,
error=%w", configPath, err)
+ }
+
+ parser := parser.NewStateMachineConfigParser()
+ smo, err := parser.Parse(content)
+ if err != nil {
+ return fmt.Errorf("failed to parse state machine definition:
path=%s, error=%w", configPath, err)
+ }
+
+ var configFileParams ConfigFileParams
+ if err := json.Unmarshal(content, &configFileParams); err != nil {
+ if err := yaml.Unmarshal(content, &configFileParams); err !=
nil {
+ return fmt.Errorf("failed to unmarshal config file as
YAML: %w", err)
+ } else {
+ c.applyConfigFileParams(&configFileParams)
+ }
+ } else {
+ c.applyConfigFileParams(&configFileParams)
+ }
+
+ if _, exists := c.stateMachineDefs[smo.Name]; exists {
+ return fmt.Errorf("state machine definition with name %s
already exists", smo.Name)
+ }
+ c.stateMachineDefs[smo.Name] = smo
+
+ return nil
+}
+
+func (c *DefaultStateMachineConfig) applyConfigFileParams(rc
*ConfigFileParams) {
+ if rc.TransOperationTimeout > 0 {
+ c.transOperationTimeout = rc.TransOperationTimeout
+ }
+ if rc.ServiceInvokeTimeout > 0 {
+ c.serviceInvokeTimeout = rc.ServiceInvokeTimeout
+ }
+ if rc.Charset != "" {
+ c.charset = rc.Charset
+ }
+ if rc.DefaultTenantId != "" {
+ c.defaultTenantId = rc.DefaultTenantId
+ }
+ c.sagaRetryPersistModeUpdate = rc.SagaRetryPersistModeUpdate
+ c.sagaCompensatePersistModeUpdate = rc.SagaCompensatePersistModeUpdate
+ c.sagaBranchRegisterEnable = rc.SagaBranchRegisterEnable
+ c.rmReportSuccessEnable = rc.RmReportSuccessEnable
+ if len(rc.StateMachineResources) > 0 {
+ c.stateMachineResources = rc.StateMachineResources
+ }
+}
+
+func (c *DefaultStateMachineConfig) registerEventConsumers() error {
+ if c.processController == nil {
+ return fmt.Errorf("ProcessController is not initialized")
+ }
+
+ pcImpl, ok := c.processController.(*ProcessControllerImpl)
+ if !ok {
+ return fmt.Errorf("ProcessController is not an instance of
ProcessControllerImpl")
+ }
+
+ if pcImpl.businessProcessor == nil {
+ return fmt.Errorf("BusinessProcessor in ProcessController is
not initialized")
+ }
+
+ processCtrlConsumer := &ProcessCtrlEventConsumer{
+ processController: c.processController,
+ }
+
+ c.syncEventBus.RegisterEventConsumer(processCtrlConsumer)
+ c.asyncEventBus.RegisterEventConsumer(processCtrlConsumer)
+
+ return nil
+}
+
+func (c *DefaultStateMachineConfig) Init() error {
+ if err := c.initExpressionComponents(); err != nil {
+ return fmt.Errorf("initialize expression components failed:
%w", err)
+ }
+
+ if err := c.initServiceInvokers(); err != nil {
+ return fmt.Errorf("initialize service invokers failed: %w", err)
+ }
+
+ if err := c.registerEventConsumers(); err != nil {
+ return fmt.Errorf("register event consumers failed: %w", err)
+ }
+
+ if c.stateMachineRepository != nil && len(c.stateMachineResources) > 0 {
+ if err := c.RegisterStateMachineDef(c.stateMachineResources);
err != nil {
+ return fmt.Errorf("register state machine def failed:
%w", err)
+ }
+ }
+
+ if err := c.Validate(); err != nil {
+ return fmt.Errorf("configuration validation failed: %w", err)
+ }
+
+ return nil
+}
+
+func (c *DefaultStateMachineConfig) initExpressionComponents() error {
+ if c.expressionFactoryManager == nil {
+ c.expressionFactoryManager = expr.NewExpressionFactoryManager()
+ }
+
+ defaultType := expr.DefaultExpressionType
+ if defaultType == "" {
+ defaultType = "Default"
+ }
+
+ if factory :=
c.expressionFactoryManager.GetExpressionFactory(defaultType); factory == nil {
+ c.RegisterExpressionFactory(defaultType,
expr.NewCELExpressionFactory())
+ }
+
+ if factory := c.expressionFactoryManager.GetExpressionFactory("CEL");
factory == nil {
+ c.RegisterExpressionFactory("CEL",
expr.NewCELExpressionFactory())
+ }
+
+ if factory := c.expressionFactoryManager.GetExpressionFactory("el");
factory == nil {
+ c.RegisterExpressionFactory("el",
expr.NewCELExpressionFactory())
+ }
+
+ if c.seqGenerator != nil {
+ sequenceFactory :=
expr.NewSequenceExpressionFactory(c.seqGenerator)
+ c.RegisterExpressionFactory("SEQUENCE", sequenceFactory)
+ c.RegisterExpressionFactory("SEQ", sequenceFactory)
+ }
+
+ if c.expressionResolver == nil {
+ resolver := &expr.DefaultExpressionResolver{}
+
resolver.SetExpressionFactoryManager(*c.expressionFactoryManager)
+ c.expressionResolver = resolver
+ }
+
+ return nil
+}
+
+func (c *DefaultStateMachineConfig) initServiceInvokers() error {
+ if c.serviceInvokerManager == nil {
+ c.serviceInvokerManager = invoker.NewServiceInvokerManagerImpl()
+ }
+
+ defaultServiceType := "local"
+ if existingInvoker :=
c.serviceInvokerManager.ServiceInvoker(defaultServiceType); existingInvoker ==
nil {
+ c.RegisterServiceInvoker(defaultServiceType,
invoker.NewLocalServiceInvoker())
+ }
+
+ return nil
+}
+
+func (c *DefaultStateMachineConfig) Validate() error {
+ var errs []error
+
+ if c.expressionFactoryManager == nil {
+ errs = append(errs, fmt.Errorf("expression factory manager is
nil"))
+ }
+ if c.expressionResolver == nil {
+ errs = append(errs, fmt.Errorf("expression resolver is nil"))
+ }
+ if c.serviceInvokerManager == nil {
+ errs = append(errs, fmt.Errorf("service invoker manager is
nil"))
+ }
+
+ if c.transOperationTimeout <= 0 {
+ errs = append(errs, fmt.Errorf("invalid trans operation
timeout: %d", c.transOperationTimeout))
+ }
+ if c.serviceInvokeTimeout <= 0 {
+ errs = append(errs, fmt.Errorf("invalid service invoke timeout:
%d", c.serviceInvokeTimeout))
+ }
+ if c.charset == "" {
+ errs = append(errs, fmt.Errorf("charset is empty"))
+ }
+
+ if c.stateMachineRepository != nil {
+ if c.stateLogStore == nil {
+ errs = append(errs, fmt.Errorf("state log store is
nil"))
+ }
+ if c.stateLangStore == nil {
+ errs = append(errs, fmt.Errorf("state lang store is
nil"))
+ }
+ if c.stateLogRepository == nil {
+ errs = append(errs, fmt.Errorf("state log repository is
nil"))
+ }
+ }
+
+ if c.statusDecisionStrategy == nil {
+ errs = append(errs, fmt.Errorf("status decision strategy is
nil"))
+ }
+ if c.syncEventBus == nil {
+ errs = append(errs, fmt.Errorf("sync event bus is nil"))
+ }
+ if c.asyncEventBus == nil {
+ errs = append(errs, fmt.Errorf("async event bus is nil"))
+ }
+
+ if len(errs) > 0 {
+ return fmt.Errorf("configuration validation failed with %d
errors: %v", len(errs), errs)
+ }
+ return nil
+}
+
+func (c *DefaultStateMachineConfig) EvaluateExpression(expressionStr string,
context any) (any, error) {
+ if c.expressionResolver == nil {
+ return nil, fmt.Errorf("expression resolver not initialized")
+ }
+
+ expression := c.expressionResolver.Expression(expressionStr)
+ if expression == nil {
+ return nil, fmt.Errorf("failed to parse expression: %s",
expressionStr)
+ }
+
+ var result any
+ var evalErr error
+
+ func() {
+ defer func() {
+ if r := recover(); r != nil {
+ evalErr = fmt.Errorf("expression evaluation
panicked: %v", r)
+ }
+ }()
+
+ result = expression.Value(context)
+ }()
+
+ if evalErr != nil {
+ return nil, evalErr
+ }
+
+ if err, ok := result.(error); ok {
+ return nil, fmt.Errorf("expression evaluation returned error:
%w", err)
+ }
+
+ return result, nil
+}
+
+func NewDefaultBusinessProcessor() *DefaultBusinessProcessor {
+ return &DefaultBusinessProcessor{
+ processHandlers: make(map[string]ProcessHandler),
+ routerHandlers: make(map[string]RouterHandler),
+ }
+}
+
+func NewDefaultStateMachineConfig(opts ...Option) *DefaultStateMachineConfig {
+ ctx := context.Background()
+ defaultBP := NewDefaultBusinessProcessor()
+
+ // stateMachineResources for development only; production uses env
vars/config files.
+ stateMachineResources := []string{
+ "testdata/saga/statelang/**/*.json",
+ "testdata/saga/statelang/**/*.yaml",
+ }
+
+ if envPaths := os.Getenv("SEATA_STATE_MACHINE_RESOURCES"); envPaths !=
"" {
+ paths := strings.Split(envPaths, ",")
+ filteredPaths := make([]string, 0, len(paths))
+ for _, p := range paths {
+ p = strings.TrimSpace(p)
+ if p != "" {
+ filteredPaths = append(filteredPaths, p)
+ }
+ }
+ if len(filteredPaths) > 0 {
+ stateMachineResources = filteredPaths
+ }
+ }
+
c := &DefaultStateMachineConfig{
- transOperationTimeout: DefaultTransOperTimeout,
- serviceInvokeTimeout: DefaultServiceInvokeTimeout,
- charset: "UTF-8",
- defaultTenantId: "000001",
+ transOperationTimeout: DefaultTransOperTimeout,
+ serviceInvokeTimeout: DefaultServiceInvokeTimeout,
+ charset: "UTF-8",
+ defaultTenantId: "000001",
+
+ stateMachineResources: stateMachineResources,
+
sagaRetryPersistModeUpdate:
DefaultClientSagaRetryPersistModeUpdate,
sagaCompensatePersistModeUpdate:
DefaultClientSagaCompensatePersistModeUpdate,
sagaBranchRegisterEnable:
DefaultClientSagaBranchRegisterEnable,
rmReportSuccessEnable:
DefaultClientReportSuccessEnable,
+ stateMachineDefs:
make(map[string]*statemachine.StateMachineObject),
componentLock: &sync.Mutex{},
+ seqGenerator: sequence.NewUUIDSeqGenerator(),
+
+ statusDecisionStrategy: NewDefaultStatusDecisionStrategy(),
+ processController: &ProcessControllerImpl{
+ businessProcessor: defaultBP,
+ },
+
+ syncEventBus: NewDirectEventBus(),
+ asyncEventBus: NewAsyncEventBus(ctx, 1000, 5),
+
+ syncProcessCtrlEventPublisher: nil,
+ asyncProcessCtrlEventPublisher: nil,
+ }
+
+ c.syncProcessCtrlEventPublisher =
NewProcessCtrlEventPublisher(c.syncEventBus)
+ c.asyncProcessCtrlEventPublisher =
NewProcessCtrlEventPublisher(c.asyncEventBus)
+
+ if err := c.LoadConfig("config.yaml"); err == nil {
+ log.Printf("Successfully loaded config from config.yaml")
+ } else {
+ log.Printf("Failed to load config file (using default/env
values): %v", err)
+ }
+
+ for _, opt := range opts {
+ opt(c)
}
- // TODO: init config
return c
}
+
+type Option func(*DefaultStateMachineConfig)
+
+func WithStatusDecisionStrategy(strategy StatusDecisionStrategy) Option {
+ return func(c *DefaultStateMachineConfig) {
+ c.statusDecisionStrategy = strategy
+ }
+}
+
+func WithSeqGenerator(gen sequence.SeqGenerator) Option {
+ return func(c *DefaultStateMachineConfig) {
+ c.seqGenerator = gen
+ }
+}
+
+func WithProcessController(ctrl ProcessController) Option {
+ return func(c *DefaultStateMachineConfig) {
+ c.processController = ctrl
+ }
+}
+
+func WithBusinessProcessor(bp BusinessProcessor) Option {
+ return func(c *DefaultStateMachineConfig) {
+ c.processController.(*ProcessControllerImpl).businessProcessor
= bp
+ }
+}
+
+func WithStateMachineResources(paths []string) Option {
+ return func(c *DefaultStateMachineConfig) {
+ if len(paths) > 0 {
+ c.stateMachineResources = paths
+ }
+ }
+}
+
+func WithStateLogRepository(logRepo StateLogRepository) Option {
+ return func(c *DefaultStateMachineConfig) {
+ c.stateLogRepository = logRepo
+ }
+}
+
+func WithStateLogStore(logStore StateLogStore) Option {
+ return func(c *DefaultStateMachineConfig) {
+ c.stateLogStore = logStore
+ }
+}
+
+func WithStateLangStore(langStore StateLangStore) Option {
+ return func(c *DefaultStateMachineConfig) {
+ c.stateLangStore = langStore
+ }
+}
+
+func WithStateMachineRepository(machineRepo StateMachineRepository) Option {
+ return func(c *DefaultStateMachineConfig) {
+ c.stateMachineRepository = machineRepo
+ }
+}
diff --git
a/pkg/saga/statemachine/engine/config/default_statemachine_config_test.go
b/pkg/saga/statemachine/engine/config/default_statemachine_config_test.go
new file mode 100644
index 00000000..82e5c427
--- /dev/null
+++ b/pkg/saga/statemachine/engine/config/default_statemachine_config_test.go
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package config
+
+import (
+ "errors"
+ "io"
+ "path/filepath"
+ "testing"
+
+ "github.com/seata/seata-go/pkg/saga/statemachine/statelang"
+ "github.com/stretchr/testify/assert"
+)
+
+func TestDefaultStateMachineConfig_LoadValidJSON(t *testing.T) {
+ config := NewDefaultStateMachineConfig()
+ testFile := filepath.Join("testdata", "order_saga.json")
+
+ err := config.LoadConfig(testFile)
+ assert.NoError(t, err, "Loading JSON configuration should succeed")
+
+ smo := config.GetStateMachineDefinition("OrderSaga")
+ assert.NotNil(t, smo, "State machine definition should not be nil")
+ assert.Equal(t, "CreateOrder", smo.StartState, "The start state should
be correct")
+ assert.Contains(t, smo.States, "CreateOrder", "The state node should
exist")
+
+ assert.Equal(t, 30000, config.transOperationTimeout, "The timeout
should be read correctly")
+}
+
+func TestDefaultStateMachineConfig_LoadValidYAML(t *testing.T) {
+ config := NewDefaultStateMachineConfig()
+ testFile := filepath.Join("testdata", "order_saga.yaml")
+
+ err := config.LoadConfig(testFile)
+ assert.NoError(t, err, "Loading YAML configuration should succeed")
+
+ smo := config.GetStateMachineDefinition("OrderSaga")
+ assert.NotNil(t, smo)
+}
+
+func TestLoadNonExistentFile(t *testing.T) {
+ config := NewDefaultStateMachineConfig()
+ err := config.LoadConfig("non_existent.json")
+ assert.Error(t, err, "Loading a non-existent file should report an
error")
+ assert.Contains(t, err.Error(), "failed to read config file", "The
error message should contain file read failure")
+}
+
+func TestGetStateMachineDefinition_Exists(t *testing.T) {
+ config := NewDefaultStateMachineConfig()
+ _ = config.LoadConfig(filepath.Join("testdata", "order_saga.json"))
+
+ smo := config.GetStateMachineDefinition("OrderSaga")
+ assert.NotNil(t, smo)
+ assert.Equal(t, "1.0", smo.Version, "The version number should be
correct")
+}
+
+func TestGetNonExistentStateMachine(t *testing.T) {
+ config := NewDefaultStateMachineConfig()
+ smo := config.GetStateMachineDefinition("NonExistent")
+ assert.Nil(t, smo, "An unloaded state machine should return nil")
+}
+
+func TestLoadDuplicateStateMachine(t *testing.T) {
+ config := NewDefaultStateMachineConfig()
+ testFile := filepath.Join("testdata", "order_saga.json")
+
+ err := config.LoadConfig(testFile)
+ assert.NoError(t, err)
+
+ err = config.LoadConfig(testFile)
+ assert.Error(t, err, "Duplicate loading should trigger a name conflict")
+ assert.Contains(t, err.Error(), "already exists", "The error message
should contain a conflict prompt")
+}
+
+func TestRuntimeConfig_OverrideDefaults(t *testing.T) {
+ config := NewDefaultStateMachineConfig()
+ assert.Equal(t, "UTF-8", config.charset, "The default character set
should be UTF-8")
+
+ _ = config.LoadConfig(filepath.Join("testdata", "order_saga.json"))
+ assert.Equal(t, "UTF-8", config.charset, "If the configuration does not
specify, the default value should be used")
+
+ customConfig := &ConfigFileParams{
+ Charset: "GBK",
+ }
+ config.applyConfigFileParams(customConfig)
+ assert.Equal(t, "GBK", config.charset, "Runtime parameters should be
correctly overridden")
+}
+
+func TestGetDefaultExpressionFactory(t *testing.T) {
+ config := NewDefaultStateMachineConfig()
+
+ err := config.Init()
+ assert.NoError(t, err, "Init should not return error")
+
+ factory := config.GetExpressionFactory("el")
+ assert.NotNil(t, factory, "The default EL factory should exist")
+
+ unknownFactory := config.GetExpressionFactory("unknown")
+ assert.Nil(t, unknownFactory, "An unknown expression type should return
nil")
+}
+
+func TestGetServiceInvoker(t *testing.T) {
+ config := NewDefaultStateMachineConfig()
+ if err := config.Init(); err != nil {
+ t.Fatalf("init config failed: %v", err)
+ }
+
+ invoker := config.GetServiceInvoker("local")
+ if invoker == nil {
+ t.Errorf("expected non-nil invoker, got nil")
+ }
+}
+
+func TestLoadConfig_InvalidJSON(t *testing.T) {
+ config := NewDefaultStateMachineConfig()
+ testFile := filepath.Join("testdata", "invalid.json")
+
+ err := config.LoadConfig(testFile)
+ assert.Error(t, err, "Loading an invalid JSON configuration should
report an error")
+ assert.Contains(t, err.Error(), "failed to parse state machine
definition", "The error message should contain parsing failure")
+}
+
+func TestLoadConfig_InvalidYAML(t *testing.T) {
+ config := NewDefaultStateMachineConfig()
+ testFile := filepath.Join("testdata", "invalid.yaml")
+
+ err := config.LoadConfig(testFile)
+ assert.Error(t, err, "Loading an invalid YAML configuration should
report an error")
+ assert.Contains(t, err.Error(), "failed to parse state machine
definition", "The error message should contain parsing failure")
+}
+
+func TestRegisterStateMachineDef_Fail(t *testing.T) {
+ config := NewDefaultStateMachineConfig()
+ invalidResource := []string{"invalid_path.json"}
+
+ err := config.RegisterStateMachineDef(invalidResource)
+ assert.Error(t, err, "Registering an invalid resource should report an
error")
+ assert.Contains(t, err.Error(), "open resource file failed", "The error
message should contain file opening failure")
+}
+
+func TestInit_ExpressionFactoryManagerNil(t *testing.T) {
+ config := NewDefaultStateMachineConfig()
+ config.expressionFactoryManager = nil
+
+ err := config.Init()
+ assert.NoError(t, err, "Initialization should succeed when the
expression factory manager is nil")
+}
+
+func TestInit_ServiceInvokerManagerNil(t *testing.T) {
+ config := NewDefaultStateMachineConfig()
+ config.serviceInvokerManager = nil
+
+ err := config.Init()
+ assert.NoError(t, err, "Initialization should succeed when the service
invoker manager is nil")
+}
+
+func TestInit_StateMachineRepositoryNil(t *testing.T) {
+ config := NewDefaultStateMachineConfig()
+ config.stateMachineRepository = nil
+
+ err := config.Init()
+ assert.NoError(t, err, "Initialization should succeed when the state
machine repository is nil")
+}
+
+func TestApplyRuntimeConfig_BoundaryValues(t *testing.T) {
+ config := NewDefaultStateMachineConfig()
+ customConfig := &ConfigFileParams{
+ TransOperationTimeout: 1,
+ ServiceInvokeTimeout: 1,
+ }
+ config.applyConfigFileParams(customConfig)
+ assert.Equal(t, 1, config.transOperationTimeout, "The minimum
transaction operation timeout should be correctly applied")
+ assert.Equal(t, 1, config.serviceInvokeTimeout, "The minimum service
invocation timeout should be correctly applied")
+
+ maxTimeout := int(^uint(0) >> 1)
+ customConfig = &ConfigFileParams{
+ TransOperationTimeout: maxTimeout,
+ ServiceInvokeTimeout: maxTimeout,
+ }
+ config.applyConfigFileParams(customConfig)
+ assert.Equal(t, maxTimeout, config.transOperationTimeout, "The maximum
transaction operation timeout should be correctly applied")
+ assert.Equal(t, maxTimeout, config.serviceInvokeTimeout, "The maximum
service invocation timeout should be correctly applied")
+}
+
+type TestStateMachineRepositoryMock struct{}
+
+func (m *TestStateMachineRepositoryMock) GetStateMachineById(stateMachineId
string) (statelang.StateMachine, error) {
+ return nil, errors.New("get state machine by id failed")
+}
+
+func (m *TestStateMachineRepositoryMock)
GetStateMachineByNameAndTenantId(stateMachineName string, tenantId string)
(statelang.StateMachine, error) {
+ return nil, errors.New("get state machine by name and tenant id failed")
+}
+
+func (m *TestStateMachineRepositoryMock)
GetLastVersionStateMachine(stateMachineName string, tenantId string)
(statelang.StateMachine, error) {
+ return nil, errors.New("get last version state machine failed")
+}
+
+func (m *TestStateMachineRepositoryMock) RegistryStateMachine(machine
statelang.StateMachine) error {
+ return errors.New("registry state machine failed")
+}
+
+func (m *TestStateMachineRepositoryMock) RegistryStateMachineByReader(reader
io.Reader) error {
+ return errors.New("registry state machine by reader failed")
+}
+
+func TestRegisterStateMachineDef_RepositoryError(t *testing.T) {
+ config := NewDefaultStateMachineConfig()
+ config.stateMachineRepository = &TestStateMachineRepositoryMock{}
+ resource := []string{filepath.Join("testdata", "order_saga.json")}
+
+ err := config.RegisterStateMachineDef(resource)
+ assert.Error(t, err, "Registration should fail when the state machine
repository reports an error")
+ assert.Contains(t, err.Error(), "register state machine from file
failed", "The error message should contain registration failure")
+}
diff --git a/pkg/saga/statemachine/engine/core/testdata/invalid.json
b/pkg/saga/statemachine/engine/core/testdata/invalid.json
new file mode 100644
index 00000000..7f171d97
--- /dev/null
+++ b/pkg/saga/statemachine/engine/core/testdata/invalid.json
@@ -0,0 +1,3 @@
+{
+ "name": "John",
+ "age": 30,
\ No newline at end of file
diff --git a/pkg/saga/statemachine/process_ctrl/event_consumer.go
b/pkg/saga/statemachine/engine/core/testdata/invalid.json.comment
similarity index 52%
copy from pkg/saga/statemachine/process_ctrl/event_consumer.go
copy to pkg/saga/statemachine/engine/core/testdata/invalid.json.comment
index 5058b1f3..d973dced 100644
--- a/pkg/saga/statemachine/process_ctrl/event_consumer.go
+++ b/pkg/saga/statemachine/engine/core/testdata/invalid.json.comment
@@ -13,39 +13,4 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- */
-
-package process_ctrl
-
-import (
- "context"
- "fmt"
- "github.com/pkg/errors"
-)
-
-type EventConsumer interface {
- Accept(event Event) bool
-
- Process(ctx context.Context, event Event) error
-}
-
-type ProcessCtrlEventConsumer struct {
- processController ProcessController
-}
-
-func (p ProcessCtrlEventConsumer) Accept(event Event) bool {
- if event == nil {
- return false
- }
-
- _, ok := event.(ProcessContext)
- return ok
-}
-
-func (p ProcessCtrlEventConsumer) Process(ctx context.Context, event Event)
error {
- processContext, ok := event.(ProcessContext)
- if !ok {
- return errors.New(fmt.Sprint("event %T is illegal, required
process_ctrl.ProcessContext", event))
- }
- return p.processController.Process(ctx, processContext)
-}
+ */
\ No newline at end of file
diff --git a/pkg/saga/statemachine/engine/core/testdata/invalid.yaml
b/pkg/saga/statemachine/engine/core/testdata/invalid.yaml
new file mode 100644
index 00000000..872937fa
--- /dev/null
+++ b/pkg/saga/statemachine/engine/core/testdata/invalid.yaml
@@ -0,0 +1,19 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+parent:
+ child1: value1
+ child2: value2
+ child3: value3
\ No newline at end of file
diff --git a/pkg/saga/statemachine/engine/core/testdata/order_saga.json
b/pkg/saga/statemachine/engine/core/testdata/order_saga.json
new file mode 100644
index 00000000..691875cc
--- /dev/null
+++ b/pkg/saga/statemachine/engine/core/testdata/order_saga.json
@@ -0,0 +1,274 @@
+{
+ "Name": "OrderSaga",
+ "Version": "1.0",
+ "StartState": "CreateOrder",
+ "trans_operation_timeout": 30000,
+ "States": {
+ "CreateOrder": {
+ "Type": "ServiceTask",
+ "serviceType": "local",
+ "serviceName": "orderService",
+ "serviceMethod": "createOrder",
+ "CompensateState": "CancelOrder",
+ "ForCompensation": false,
+ "ForUpdate": false,
+ "Retry": [
+ {
+ "Exceptions": [
+ "OrderCreationException",
+ "InventoryUnavailableException"
+ ],
+ "IntervalSeconds": 2,
+ "MaxAttempts": 3,
+ "BackoffRate": 1.5
+ }
+ ],
+ "Catches": [
+ {
+ "Exceptions": [
+ "OrderCreationException",
+ "InventoryUnavailableException"
+ ],
+ "Next": "ErrorHandler"
+ }
+ ],
+ "Status": {
+ "return.code == 'SUCCESS'": "SUCCEEDED",
+ "return.code == 'FAIL'": "FAILED",
+ "$exception{*}": "UNKNOWN"
+ },
+ "Input": [
+ {
+ "orderInfo": "$.orderInfo"
+ }
+ ],
+ "Output": {
+ "orderId": "$.#root"
+ },
+ "Next": "CheckStock",
+ "Loop": {
+ "Parallel": 1,
+ "Collection": "$.orderItems",
+ "ElementVariableName": "item",
+ "ElementIndexName": "index",
+ "CompletionCondition": "[nrOfInstances] == [nrOfCompletedInstances]"
+ }
+ },
+ "CheckStock": {
+ "Type": "ServiceTask",
+ "serviceType": "local",
+ "serviceName": "inventoryService",
+ "serviceMethod": "checkAvailability",
+ "CompensateState": "RollbackStock",
+ "ForCompensation": false,
+ "ForUpdate": false,
+ "Retry": [
+ {
+ "Exceptions": [
+ "StockCheckException"
+ ],
+ "IntervalSeconds": 2,
+ "MaxAttempts": 2,
+ "BackoffRate": 1.2
+ }
+ ],
+ "Catches": [
+ {
+ "Exceptions": [
+ "StockCheckException"
+ ],
+ "Next": "ErrorHandler"
+ }
+ ],
+ "Status": {
+ "return.available == true": "IN_STOCK",
+ "return.available == false": "OUT_OF_STOCK",
+ "$exception{*}": "UNKNOWN"
+ },
+ "Input": [
+ {
+ "orderId": "$.orderId"
+ },
+ {
+ "itemsList": "$.orderItems"
+ }
+ ],
+ "Output": {
+ "stockAvailable": "$.#root"
+ },
+ "Next": "DecideStock"
+ },
+ "DecideStock": {
+ "Type": "Choice",
+ "Choices": [
+ {
+ "Expression": "stockAvailable == true",
+ "Next": "ReserveStock"
+ },
+ {
+ "Expression": "stockAvailable == false",
+ "Next": "CancelOrder"
+ }
+ ],
+ "Default": "ErrorHandler"
+ },
+ "ReserveStock": {
+ "Type": "ServiceTask",
+ "serviceType": "local",
+ "serviceName": "inventoryService",
+ "serviceMethod": "reserveItems",
+ "CompensateState": "RollbackStock",
+ "ForCompensation": false,
+ "ForUpdate": false,
+ "Retry": [
+ {
+ "Exceptions": [
+ "StockReservationException"
+ ],
+ "IntervalSeconds": 2,
+ "MaxAttempts": 2,
+ "BackoffRate": 1.2
+ }
+ ],
+ "Catches": [
+ {
+ "Exceptions": [
+ "StockReservationException"
+ ],
+ "Next": "ErrorHandler"
+ }
+ ],
+ "Status": {
+ "return.code == 'RESERVED'": "STOCK_RESERVED",
+ "return.code == 'FAILED'": "FAILED",
+ "$exception{*}": "UNKNOWN"
+ },
+ "Input": [
+ {
+ "orderId": "$.orderId"
+ },
+ {
+ "itemList": "$.orderItems"
+ }
+ ],
+ "Output": {
+ "stockReservationId": "$.#root"
+ },
+ "Next": "ProcessPayment"
+ },
+ "ProcessPayment": {
+ "Type": "ServiceTask",
+ "serviceType": "local",
+ "serviceName": "paymentService",
+ "serviceMethod": "processPayment",
+ "CompensateState": "RefundPayment",
+ "ForCompensation": false,
+ "ForUpdate": false,
+ "Retry": [
+ {
+ "Exceptions": [
+ "PaymentProcessingException"
+ ],
+ "IntervalSeconds": 3,
+ "MaxAttempts": 3,
+ "BackoffRate": 1.5
+ }
+ ],
+ "Catches": [
+ {
+ "Exceptions": [
+ "PaymentProcessingException"
+ ],
+ "Next": "ErrorHandler"
+ }
+ ],
+ "Status": {
+ "return.code == 'PAID'": "PAYMENT_SUCCESS",
+ "return.code == 'DECLINED'": "PAYMENT_FAILED",
+ "$exception{*}": "UNKNOWN"
+ },
+ "Input": [
+ {
+ "orderId": "$.orderId"
+ },
+ {
+ "amount": "$.orderTotal"
+ }
+ ],
+ "Output": {
+ "paymentTransactionId": "$.#root"
+ },
+ "Next": "CompleteOrder"
+ },
+ "CompleteOrder": {
+ "Type": "Succeed"
+ },
+ "CancelOrder": {
+ "Type": "ServiceTask",
+ "serviceType": "local",
+ "serviceName": "orderService",
+ "serviceMethod": "cancelOrder",
+ "ForCompensation": true,
+ "ForUpdate": true,
+ "Input": [
+ {
+ "orderId": "$.orderId"
+ }
+ ],
+ "Output": {
+ "cancelResult": "$.#root"
+ },
+ "Next": "RollbackStock"
+ },
+ "RollbackStock": {
+ "Type": "ServiceTask",
+ "serviceType": "local",
+ "serviceName": "inventoryService",
+ "serviceMethod": "releaseItems",
+ "ForCompensation": true,
+ "ForUpdate": true,
+ "Input": [
+ {
+ "orderId": "$.orderId"
+ },
+ {
+ "stockReservationId": "$.stockReservationId"
+ }
+ ],
+ "Output": {
+ "rollbackResult": "$.#root"
+ },
+ "Next": "RefundPayment"
+ },
+ "RefundPayment": {
+ "Type": "ServiceTask",
+ "serviceType": "local",
+ "serviceName": "paymentService",
+ "serviceMethod": "refundPayment",
+ "ForCompensation": true,
+ "ForUpdate": true,
+ "Input": [
+ {
+ "orderId": "$.orderId"
+ },
+ {
+ "paymentTransactionId": "$.paymentTransactionId"
+ }
+ ],
+ "Output": {
+ "refundResult": "$.#root"
+ },
+ "Next": "FailState"
+ },
+ "ErrorHandler": {
+ "Type": "Fail",
+ "ErrorCode": "ORDER_PROCESSING_ERROR",
+ "Message": "An unrecoverable error occurred during order processing."
+ },
+ "FailState": {
+ "Type": "Fail",
+ "ErrorCode": "ORDER_CANCELLED",
+ "Message": "The order has been cancelled and compensation actions have
been completed."
+ }
+ }
+}
\ No newline at end of file
diff --git a/pkg/saga/statemachine/process_ctrl/event_consumer.go
b/pkg/saga/statemachine/engine/core/testdata/order_saga.json.comment
similarity index 52%
copy from pkg/saga/statemachine/process_ctrl/event_consumer.go
copy to pkg/saga/statemachine/engine/core/testdata/order_saga.json.comment
index 5058b1f3..d973dced 100644
--- a/pkg/saga/statemachine/process_ctrl/event_consumer.go
+++ b/pkg/saga/statemachine/engine/core/testdata/order_saga.json.comment
@@ -13,39 +13,4 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- */
-
-package process_ctrl
-
-import (
- "context"
- "fmt"
- "github.com/pkg/errors"
-)
-
-type EventConsumer interface {
- Accept(event Event) bool
-
- Process(ctx context.Context, event Event) error
-}
-
-type ProcessCtrlEventConsumer struct {
- processController ProcessController
-}
-
-func (p ProcessCtrlEventConsumer) Accept(event Event) bool {
- if event == nil {
- return false
- }
-
- _, ok := event.(ProcessContext)
- return ok
-}
-
-func (p ProcessCtrlEventConsumer) Process(ctx context.Context, event Event)
error {
- processContext, ok := event.(ProcessContext)
- if !ok {
- return errors.New(fmt.Sprint("event %T is illegal, required
process_ctrl.ProcessContext", event))
- }
- return p.processController.Process(ctx, processContext)
-}
+ */
\ No newline at end of file
diff --git a/pkg/saga/statemachine/engine/core/testdata/order_saga.yaml
b/pkg/saga/statemachine/engine/core/testdata/order_saga.yaml
new file mode 100644
index 00000000..39439d20
--- /dev/null
+++ b/pkg/saga/statemachine/engine/core/testdata/order_saga.yaml
@@ -0,0 +1,204 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+Name: "OrderSaga"
+Version: "1.0"
+StartState: "CreateOrder"
+trans_operation_timeout: 30000
+States:
+ CreateOrder:
+ Type: "ServiceTask"
+ serviceType: "local"
+ serviceName: "orderService"
+ serviceMethod: "createOrder"
+ CompensateState: "CancelOrder"
+ ForCompensation: false
+ ForUpdate: false
+ Retry:
+ - Exceptions:
+ - "OrderCreationException"
+ - "InventoryUnavailableException"
+ IntervalSeconds: 2
+ MaxAttempts: 3
+ BackoffRate: 1.5
+ Catches:
+ - Exceptions:
+ - "OrderCreationException"
+ - "InventoryUnavailableException"
+ Next: "ErrorHandler"
+ Status:
+ "return.code == 'SUCCESS'": "SUCCEEDED"
+ "return.code == 'FAIL'": "FAILED"
+ "$exception{*}": "UNKNOWN"
+ Input:
+ - orderInfo: "$.orderInfo"
+ Output:
+ orderId: "$.#root"
+ Next: "CheckStock"
+ Loop:
+ Parallel: 1
+ Collection: "$.orderItems"
+ ElementVariableName: "item"
+ ElementIndexName: "index"
+ CompletionCondition: "[nrOfInstances] == [nrOfCompletedInstances]"
+
+ CheckStock:
+ Type: "ServiceTask"
+ serviceType: "local"
+ serviceName: "inventoryService"
+ serviceMethod: "checkAvailability"
+ CompensateState: "RollbackStock"
+ ForCompensation: false
+ ForUpdate: false
+ Retry:
+ - Exceptions:
+ - "StockCheckException"
+ IntervalSeconds: 2
+ MaxAttempts: 2
+ BackoffRate: 1.2
+ Catches:
+ - Exceptions:
+ - "StockCheckException"
+ Next: "ErrorHandler"
+ Status:
+ "return.available == true": "IN_STOCK"
+ "return.available == false": "OUT_OF_STOCK"
+ "$exception{*}": "UNKNOWN"
+ Input:
+ - orderId: "$.orderId"
+ - itemsList: "$.orderItems"
+ Output:
+ stockAvailable: "$.#root"
+ Next: "DecideStock"
+
+ DecideStock:
+ Type: "Choice"
+ Choices:
+ - Expression: "stockAvailable == true"
+ Next: "ReserveStock"
+ - Expression: "stockAvailable == false"
+ Next: "CancelOrder"
+ Default: "ErrorHandler"
+
+ ReserveStock:
+ Type: "ServiceTask"
+ serviceType: "local"
+ serviceName: "inventoryService"
+ serviceMethod: "reserveItems"
+ CompensateState: "RollbackStock"
+ ForCompensation: false
+ ForUpdate: false
+ Retry:
+ - Exceptions:
+ - "StockReservationException"
+ IntervalSeconds: 2
+ MaxAttempts: 2
+ BackoffRate: 1.2
+ Catches:
+ - Exceptions:
+ - "StockReservationException"
+ Next: "ErrorHandler"
+ Status:
+ "return.code == 'RESERVED'": "STOCK_RESERVED"
+ "return.code == 'FAILED'": "FAILED"
+ "$exception{*}": "UNKNOWN"
+ Input:
+ - orderId: "$.orderId"
+ - itemList: "$.orderItems"
+ Output:
+ stockReservationId: "$.#root"
+ Next: "ProcessPayment"
+
+ ProcessPayment:
+ Type: "ServiceTask"
+ serviceType: "local"
+ serviceName: "paymentService"
+ serviceMethod: "processPayment"
+ CompensateState: "RefundPayment"
+ ForCompensation: false
+ ForUpdate: false
+ Retry:
+ - Exceptions:
+ - "PaymentProcessingException"
+ IntervalSeconds: 3
+ MaxAttempts: 3
+ BackoffRate: 1.5
+ Catches:
+ - Exceptions:
+ - "PaymentProcessingException"
+ Next: "ErrorHandler"
+ Status:
+ "return.code == 'PAID'": "PAYMENT_SUCCESS"
+ "return.code == 'DECLINED'": "PAYMENT_FAILED"
+ "$exception{*}": "UNKNOWN"
+ Input:
+ - orderId: "$.orderId"
+ - amount: "$.orderTotal"
+ Output:
+ paymentTransactionId: "$.#root"
+ Next: "CompleteOrder"
+
+ CompleteOrder:
+ Type: "Succeed"
+
+ CancelOrder:
+ Type: "ServiceTask"
+ serviceType: "local"
+ serviceName: "orderService"
+ serviceMethod: "cancelOrder"
+ ForCompensation: true
+ ForUpdate: true
+ Input:
+ - orderId: "$.orderId"
+ Output:
+ cancelResult: "$.#root"
+ Next: "RollbackStock"
+
+ RollbackStock:
+ Type: "ServiceTask"
+ serviceType: "local"
+ serviceName: "inventoryService"
+ serviceMethod: "releaseItems"
+ ForCompensation: true
+ ForUpdate: true
+ Input:
+ - orderId: "$.orderId"
+ - stockReservationId: "$.stockReservationId"
+ Output:
+ rollbackResult: "$.#root"
+ Next: "RefundPayment"
+
+ RefundPayment:
+ Type: "ServiceTask"
+ serviceType: "local"
+ serviceName: "paymentService"
+ serviceMethod: "refundPayment"
+ ForCompensation: true
+ ForUpdate: true
+ Input:
+ - orderId: "$.orderId"
+ - paymentTransactionId: "$.paymentTransactionId"
+ Output:
+ refundResult: "$.#root"
+ Next: "FailState"
+
+ ErrorHandler:
+ Type: "Fail"
+ ErrorCode: "ORDER_PROCESSING_ERROR"
+ Message: "订单处理过程中发生不可恢复的错误。"
+
+ FailState:
+ Type: "Fail"
+ ErrorCode: "ORDER_CANCELLED"
+ Message: "订单已取消,触发补偿完成。"
diff --git a/pkg/saga/statemachine/engine/expr/error_expression.go
b/pkg/saga/statemachine/engine/expr/error_expression.go
new file mode 100644
index 00000000..da44804b
--- /dev/null
+++ b/pkg/saga/statemachine/engine/expr/error_expression.go
@@ -0,0 +1,21 @@
+package expr
+
+// ErrorExpression is a placeholder implementation that always reports an
error.
+// When parsing/constructing an expression fails, this type is returned
directly.
+// The Value() method returns the error as-is, and SetValue() is a no-op.
+type ErrorExpression struct {
+ err error
+ expressionStr string
+}
+
+func (e *ErrorExpression) Value(elContext any) any {
+ return e.err
+}
+
+func (e *ErrorExpression) SetValue(value any, elContext any) {
+ // No write operation for error expressions
+}
+
+func (e *ErrorExpression) ExpressionString() string {
+ return e.expressionStr
+}
diff --git a/pkg/saga/statemachine/engine/expr/sequence_expression.go
b/pkg/saga/statemachine/engine/expr/sequence_expression.go
index d6509252..51f48187 100644
--- a/pkg/saga/statemachine/engine/expr/sequence_expression.go
+++ b/pkg/saga/statemachine/engine/expr/sequence_expression.go
@@ -51,7 +51,7 @@ func (s *SequenceExpression) SetRule(rule string) {
s.rule = rule
}
-func (s SequenceExpression) Value(vars map[string]any) any {
+func (s SequenceExpression) Value(elContext any) any {
return s.seqGenerator.GenerateId(s.entity, s.rule)
}
diff --git a/pkg/saga/statemachine/engine/expr/sequence_expression_factory.go
b/pkg/saga/statemachine/engine/expr/sequence_expression_factory.go
new file mode 100644
index 00000000..682c84b5
--- /dev/null
+++ b/pkg/saga/statemachine/engine/expr/sequence_expression_factory.go
@@ -0,0 +1,39 @@
+package expr
+
+import (
+ "fmt"
+ "strings"
+
+ "github.com/seata/seata-go/pkg/saga/statemachine/engine/sequence"
+)
+
+// SequenceExpressionFactory implements the ExpressionFactory interface,
+// designed to parse strings in the format "entity|rule" and create
SequenceExpression instances.
+// If the format is invalid, it returns an *ErrorExpression containing the
parsing error.
+type SequenceExpressionFactory struct {
+ seqGenerator sequence.SeqGenerator
+}
+
+func NewSequenceExpressionFactory(seqGenerator sequence.SeqGenerator)
*SequenceExpressionFactory {
+ return &SequenceExpressionFactory{seqGenerator: seqGenerator}
+}
+
+// CreateExpression parses the input string into a SequenceExpression.
+// The input must be in the format "entity|rule". If the format is invalid,
+// it returns an ErrorExpression with a descriptive error message.
+func (f *SequenceExpressionFactory) CreateExpression(expression string)
Expression {
+ parts := strings.Split(expression, "|")
+ if len(parts) != 2 {
+ return &ErrorExpression{
+ err: fmt.Errorf("invalid sequence expression
format: %s, expected 'entity|rule'", expression),
+ expressionStr: expression,
+ }
+ }
+
+ seqExpr := &SequenceExpression{
+ seqGenerator: f.seqGenerator,
+ entity: strings.TrimSpace(parts[0]),
+ rule: strings.TrimSpace(parts[1]),
+ }
+ return seqExpr
+}
diff --git a/pkg/saga/statemachine/engine/invoker/invoker.go
b/pkg/saga/statemachine/engine/invoker/invoker.go
index e0cb85f4..d84798b1 100644
--- a/pkg/saga/statemachine/engine/invoker/invoker.go
+++ b/pkg/saga/statemachine/engine/invoker/invoker.go
@@ -19,11 +19,29 @@ package invoker
import (
"context"
- "github.com/seata/seata-go/pkg/saga/statemachine/statelang/state"
+ "encoding/json"
+ "fmt"
"reflect"
"sync"
+
+ "github.com/seata/seata-go/pkg/saga/statemachine/statelang/state"
)
+type JsonParser interface {
+ Unmarshal(data []byte, v any) error
+ Marshal(v any) ([]byte, error)
+}
+
+type DefaultJsonParser struct{}
+
+func (p *DefaultJsonParser) Unmarshal(data []byte, v any) error {
+ return json.Unmarshal(data, v)
+}
+
+func (p *DefaultJsonParser) Marshal(v any) ([]byte, error) {
+ return json.Marshal(v)
+}
+
type ScriptInvokerManager interface {
}
@@ -45,6 +63,146 @@ type ServiceInvokerManagerImpl struct {
mutex sync.Mutex
}
+type LocalServiceInvoker struct {
+ serviceRegistry map[string]interface{}
+ methodCache map[string]*reflect.Method
+ jsonParser JsonParser
+ mutex sync.RWMutex
+}
+
+func NewLocalServiceInvoker() *LocalServiceInvoker {
+ return &LocalServiceInvoker{
+ serviceRegistry: make(map[string]interface{}),
+ methodCache: make(map[string]*reflect.Method),
+ jsonParser: &DefaultJsonParser{},
+ }
+}
+
+func (l *LocalServiceInvoker) RegisterService(serviceName string, instance
interface{}) {
+ l.mutex.Lock()
+ defer l.mutex.Unlock()
+ l.serviceRegistry[serviceName] = instance
+}
+
+func (l *LocalServiceInvoker) Invoke(ctx context.Context, input []any, service
state.ServiceTaskState) ([]reflect.Value, error) {
+ serviceName := service.ServiceName()
+ instance, exists := l.serviceRegistry[serviceName]
+ if !exists {
+ return nil, fmt.Errorf("service %s not registered", serviceName)
+ }
+
+ methodName := service.ServiceMethod()
+ method, err := l.getMethod(serviceName, methodName,
service.ParameterTypes())
+ if err != nil {
+ return nil, err
+ }
+
+ params, err := l.resolveParameters(input, method.Type)
+ if err != nil {
+ return nil, err
+ }
+
+ return l.invokeMethod(instance, method, params), nil
+}
+
+func (l *LocalServiceInvoker) resolveMethod(key, serviceName, methodName
string) (*reflect.Method, error) {
+ l.mutex.Lock()
+ defer l.mutex.Unlock()
+
+ if cachedMethod, ok := l.methodCache[key]; ok {
+ return cachedMethod, nil
+ }
+
+ instance, exists := l.serviceRegistry[serviceName]
+ if !exists {
+ return nil, fmt.Errorf("service %s not found", serviceName)
+ }
+
+ objType := reflect.TypeOf(instance)
+ method, ok := objType.MethodByName(methodName)
+ if !ok {
+ return nil, fmt.Errorf("method %s not found in service %s",
methodName, serviceName)
+ }
+
+ l.methodCache[key] = &method
+ return &method, nil
+}
+
+func (l *LocalServiceInvoker) getMethod(serviceName, methodName string,
paramTypes []string) (*reflect.Method, error) {
+ key := fmt.Sprintf("%s.%s", serviceName, methodName)
+
+ l.mutex.RLock()
+ if method, ok := l.methodCache[key]; ok {
+ l.mutex.RUnlock()
+ return method, nil
+ }
+ l.mutex.RUnlock()
+
+ return l.resolveMethod(key, serviceName, methodName)
+}
+
+func (l *LocalServiceInvoker) resolveParameters(input []any, methodType
reflect.Type) ([]reflect.Value, error) {
+ params := make([]reflect.Value, methodType.NumIn())
+ for i := 0; i < methodType.NumIn(); i++ {
+ paramType := methodType.In(i)
+ if i >= len(input) {
+ params[i] = reflect.Zero(paramType)
+ continue
+ }
+
+ converted, err := l.convertParam(input[i], paramType)
+ if err != nil {
+ return nil, err
+ }
+ params[i] = reflect.ValueOf(converted)
+ }
+ return params, nil
+}
+
+func (l *LocalServiceInvoker) convertParam(value any, targetType reflect.Type)
(any, error) {
+ if targetType.Kind() == reflect.Ptr {
+ targetType = targetType.Elem()
+ value = reflect.ValueOf(value).Interface()
+ }
+
+ if targetType.Kind() == reflect.Int && reflect.TypeOf(value).Kind() ==
reflect.Float64 {
+ return int(value.(float64)), nil
+ } else if targetType == reflect.TypeOf("") &&
reflect.TypeOf(value).Kind() == reflect.Int {
+ return fmt.Sprintf("%d", value), nil
+ }
+
+ if targetType.Kind() == reflect.Struct {
+ jsonData, err := l.jsonParser.Marshal(value)
+ if err != nil {
+ return nil, err
+ }
+ instance := reflect.New(targetType).Interface()
+ if err := l.jsonParser.Unmarshal(jsonData, instance); err !=
nil {
+ return nil, err
+ }
+ return instance, nil
+ }
+
+ return value, nil
+}
+
+func (l *LocalServiceInvoker) invokeMethod(instance interface{}, method
*reflect.Method, params []reflect.Value) []reflect.Value {
+ instanceValue := reflect.ValueOf(instance)
+ if method.Func.IsValid() {
+ allParams := append([]reflect.Value{instanceValue}, params...)
+ return method.Func.Call(allParams)
+ }
+ return nil
+}
+
+func (l *LocalServiceInvoker) Close(ctx context.Context) error {
+ l.mutex.Lock()
+ defer l.mutex.Unlock()
+ l.serviceRegistry = nil
+ l.methodCache = nil
+ return nil
+}
+
func NewServiceInvokerManagerImpl() *ServiceInvokerManagerImpl {
return &ServiceInvokerManagerImpl{
invokers: make(map[string]ServiceInvoker),
diff --git
a/pkg/saga/statemachine/engine/repo/repository/state_log_repository.go
b/pkg/saga/statemachine/engine/repo/repository/state_log_repository.go
index 8404a3d1..c9c10480 100644
--- a/pkg/saga/statemachine/engine/repo/repository/state_log_repository.go
+++ b/pkg/saga/statemachine/engine/repo/repository/state_log_repository.go
@@ -111,7 +111,7 @@ func (s *StateLogRepositoryImpl)
GetStateMachineInstanceByBusinessKey(businessKe
return
s.stateLogStore.GetStateMachineInstanceByBusinessKey(businessKey, tenantId)
}
-func (s *StateLogRepositoryImpl) QueryStateMachineInstanceByParentId(parentId
string) ([]statelang.StateMachineInstance, error) {
+func (s *StateLogRepositoryImpl) GetStateMachineInstanceByParentId(parentId
string) ([]statelang.StateMachineInstance, error) {
if s.stateLogStore == nil {
return nil, errors.New("stateLogStore is not initialized")
}
@@ -125,12 +125,12 @@ func (s *StateLogRepositoryImpl)
GetStateInstance(stateInstanceId, machineInstId
return s.stateLogStore.GetStateInstance(stateInstanceId, machineInstId)
}
-func (s *StateLogRepositoryImpl)
QueryStateInstanceListByMachineInstanceId(stateMachineInstanceId string)
([]statelang.StateInstance, error) {
+
+func (s *StateLogRepositoryImpl)
GetStateInstanceListByMachineInstanceId(stateMachineInstanceId string)
([]statelang.StateInstance, error) {
if s.stateLogStore == nil {
return nil, errors.New("stateLogStore is not initialized")
}
return
s.stateLogStore.GetStateInstanceListByMachineInstanceId(stateMachineInstanceId)
-
}
func (s *StateLogRepositoryImpl) SetStateLogStore(stateLogStore
store.StateLogStore) {
diff --git a/pkg/saga/statemachine/engine/repo/statemachine_store.go
b/pkg/saga/statemachine/engine/repo/statemachine_store.go
index 5a426b17..c0360a5a 100644
--- a/pkg/saga/statemachine/engine/repo/statemachine_store.go
+++ b/pkg/saga/statemachine/engine/repo/statemachine_store.go
@@ -25,7 +25,7 @@ import (
type StateLogRepository interface {
GetStateMachineInstance(stateMachineInstanceId string)
(statelang.StateMachineInstance, error)
- GetStateMachineInstanceByBusinessKey(businessKey string, tenantId
string) (statelang.StateInstance, error)
+ GetStateMachineInstanceByBusinessKey(businessKey string, tenantId
string) (statelang.StateMachineInstance, error)
GetStateMachineInstanceByParentId(parentId string)
([]statelang.StateMachineInstance, error)
diff --git a/pkg/saga/statemachine/engine/statemachine_config.go
b/pkg/saga/statemachine/engine/statemachine_config.go
index bd24fbc9..04db45cd 100644
--- a/pkg/saga/statemachine/engine/statemachine_config.go
+++ b/pkg/saga/statemachine/engine/statemachine_config.go
@@ -18,6 +18,7 @@
package engine
import (
+ "github.com/seata/seata-go/pkg/saga/statemachine"
"github.com/seata/seata-go/pkg/saga/statemachine/engine/expr"
"github.com/seata/seata-go/pkg/saga/statemachine/engine/invoker"
"github.com/seata/seata-go/pkg/saga/statemachine/engine/repo"
@@ -36,7 +37,7 @@ type StateMachineConfig interface {
StateLangStore() store.StateLangStore
- ExpressionFactoryManager() expr.ExpressionFactoryManager
+ ExpressionFactoryManager() *expr.ExpressionFactoryManager
ExpressionResolver() expr.ExpressionResolver
@@ -54,11 +55,23 @@ type StateMachineConfig interface {
CharSet() string
- DefaultTenantId() string
+ GetDefaultTenantId() string
- TransOperationTimeout() int
+ GetTransOperationTimeout() int
- ServiceInvokeTimeout() int
+ GetServiceInvokeTimeout() int
ComponentLock() *sync.Mutex
+
+ RegisterStateMachineDef(resources []string) error
+
+ RegisterExpressionFactory(expressionType string, factory
expr.ExpressionFactory)
+
+ RegisterServiceInvoker(serviceType string, invoker
invoker.ServiceInvoker)
+
+ GetStateMachineDefinition(name string) *statemachine.StateMachineObject
+
+ GetExpressionFactory(expressionType string) expr.ExpressionFactory
+
+ GetServiceInvoker(serviceType string) invoker.ServiceInvoker
}
diff --git a/pkg/saga/statemachine/process_ctrl/event_bus.go
b/pkg/saga/statemachine/process_ctrl/event_bus.go
index 49c282ef..51a40dba 100644
--- a/pkg/saga/statemachine/process_ctrl/event_bus.go
+++ b/pkg/saga/statemachine/process_ctrl/event_bus.go
@@ -128,3 +128,19 @@ func (a AsyncEventBus) Offer(ctx context.Context, event
Event) (bool, error) {
return true, nil
}
+
+func NewDirectEventBus() *DirectEventBus {
+ return &DirectEventBus{
+ BaseEventBus: BaseEventBus{
+ eventConsumerList: make([]EventConsumer, 0),
+ },
+ }
+}
+
+func NewAsyncEventBus(ctx context.Context, queueSize int, workerCount int)
*AsyncEventBus {
+ return &AsyncEventBus{
+ BaseEventBus: BaseEventBus{
+ eventConsumerList: make([]EventConsumer, 0),
+ },
+ }
+}
diff --git a/pkg/saga/statemachine/process_ctrl/event_consumer.go
b/pkg/saga/statemachine/process_ctrl/event_consumer.go
index 5058b1f3..68dd90ae 100644
--- a/pkg/saga/statemachine/process_ctrl/event_consumer.go
+++ b/pkg/saga/statemachine/process_ctrl/event_consumer.go
@@ -20,7 +20,6 @@ package process_ctrl
import (
"context"
"fmt"
- "github.com/pkg/errors"
)
type EventConsumer interface {
@@ -44,8 +43,8 @@ func (p ProcessCtrlEventConsumer) Accept(event Event) bool {
func (p ProcessCtrlEventConsumer) Process(ctx context.Context, event Event)
error {
processContext, ok := event.(ProcessContext)
- if !ok {
- return errors.New(fmt.Sprint("event %T is illegal, required
process_ctrl.ProcessContext", event))
+ if !ok {
+ return fmt.Errorf("event %T is illegal, required
process_ctrl.ProcessContext", event)
}
return p.processController.Process(ctx, processContext)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]