This is an automated email from the ASF dual-hosted git repository.
zfeng pushed a commit to branch feature/saga
in repository https://gitbox.apache.org/repos/asf/incubator-seata-go.git
The following commit(s) were added to refs/heads/feature/saga by this push:
new 17237abd Resolve Merge Conflicts, Add Persistence Initialization, and
Optimize Tests (#847)
17237abd is described below
commit 17237abd3a2804be86a69908982186747526a30b
Author: flypiggy <[email protected]>
AuthorDate: Sat Jul 5 15:54:48 2025 +0800
Resolve Merge Conflicts, Add Persistence Initialization, and Optimize Tests
(#847)
* fix-conflict&add-store-component-init&fix-test
* Update process_ctrl_statemachine_engine.go
---
.../engine/config/default_statemachine_config.go | 147 ++++++++++-----------
.../config/default_statemachine_config_test.go | 35 +++++
pkg/saga/statemachine/engine/config/noop_store.go | 86 ++++++++++++
.../engine/{core => config}/testdata/invalid.json | 0
.../{core => config}/testdata/invalid.json.comment | 0
.../engine/{core => config}/testdata/invalid.yaml | 0
.../{core => config}/testdata/order_saga.json | 0
.../testdata/order_saga.json.comment | 0
.../{core => config}/testdata/order_saga.yaml | 0
.../core/process_ctrl_statemachine_engine.go | 6 +-
.../engine/repo/repository/state_log_repository.go | 15 +--
.../process_ctrl/bussiness_processor.go | 7 +
.../statemachine/process_ctrl/event_consumer.go | 8 +-
13 files changed, 215 insertions(+), 89 deletions(-)
diff --git a/pkg/saga/statemachine/engine/config/default_statemachine_config.go
b/pkg/saga/statemachine/engine/config/default_statemachine_config.go
index 78943e74..07e4f953 100644
--- a/pkg/saga/statemachine/engine/config/default_statemachine_config.go
+++ b/pkg/saga/statemachine/engine/config/default_statemachine_config.go
@@ -21,14 +21,15 @@ import (
"context"
"encoding/json"
"fmt"
+ "github.com/seata/seata-go/pkg/saga/statemachine/engine/repo/repository"
+ "github.com/seata/seata-go/pkg/saga/statemachine/engine/strategy"
+ "gopkg.in/yaml.v3"
"log"
"os"
"path/filepath"
"strings"
"sync"
- "gopkg.in/yaml.v3"
-
"github.com/seata/seata-go/pkg/saga/statemachine"
"github.com/seata/seata-go/pkg/saga/statemachine/engine"
"github.com/seata/seata-go/pkg/saga/statemachine/engine/expr"
@@ -36,9 +37,8 @@ import (
"github.com/seata/seata-go/pkg/saga/statemachine/engine/repo"
"github.com/seata/seata-go/pkg/saga/statemachine/engine/sequence"
"github.com/seata/seata-go/pkg/saga/statemachine/process_ctrl"
- "github.com/seata/seata-go/pkg/saga/statemachine/store"
- "sync"
"github.com/seata/seata-go/pkg/saga/statemachine/statelang/parser"
+ "github.com/seata/seata-go/pkg/saga/statemachine/store"
)
const (
@@ -66,11 +66,11 @@ type DefaultStateMachineConfig struct {
stateMachineDefs map[string]*statemachine.StateMachineObject
// Components
- processController ProcessController
+ processController process_ctrl.ProcessController
// Event Bus
- syncEventBus EventBus
- asyncEventBus EventBus
+ syncEventBus process_ctrl.EventBus
+ asyncEventBus process_ctrl.EventBus
// Event publisher
syncProcessCtrlEventPublisher process_ctrl.EventPublisher
@@ -120,15 +120,15 @@ func (c *DefaultStateMachineConfig)
SetDefaultTenantId(defaultTenantId string) {
c.defaultTenantId = defaultTenantId
}
-func (c *DefaultStateMachineConfig) SetSyncEventBus(syncEventBus EventBus) {
+func (c *DefaultStateMachineConfig) SetSyncEventBus(syncEventBus
process_ctrl.EventBus) {
c.syncEventBus = syncEventBus
}
-func (c *DefaultStateMachineConfig) SetAsyncEventBus(asyncEventBus EventBus) {
+func (c *DefaultStateMachineConfig) SetAsyncEventBus(asyncEventBus
process_ctrl.EventBus) {
c.asyncEventBus = asyncEventBus
}
-func (c *DefaultStateMachineConfig)
SetSyncProcessCtrlEventPublisher(syncProcessCtrlEventPublisher EventPublisher) {
+func (c *DefaultStateMachineConfig)
SetSyncProcessCtrlEventPublisher(syncProcessCtrlEventPublisher
process_ctrl.EventPublisher) {
c.syncProcessCtrlEventPublisher = syncProcessCtrlEventPublisher
}
@@ -208,15 +208,15 @@ func (c *DefaultStateMachineConfig)
StatusDecisionStrategy() engine.StatusDecisi
return c.statusDecisionStrategy
}
-func (c *DefaultStateMachineConfig) SyncEventBus() EventBus {
+func (c *DefaultStateMachineConfig) SyncEventBus() process_ctrl.EventBus {
return c.syncEventBus
}
-func (c *DefaultStateMachineConfig) AsyncEventBus() EventBus {
+func (c *DefaultStateMachineConfig) AsyncEventBus() process_ctrl.EventBus {
return c.asyncEventBus
}
-func (c *DefaultStateMachineConfig) EventPublisher() EventPublisher {
+func (c *DefaultStateMachineConfig) EventPublisher()
process_ctrl.EventPublisher {
return c.syncProcessCtrlEventPublisher
}
@@ -407,21 +407,19 @@ func (c *DefaultStateMachineConfig)
registerEventConsumers() error {
return fmt.Errorf("ProcessController is not initialized")
}
- pcImpl, ok := c.processController.(*ProcessControllerImpl)
+ pcImpl, ok := c.processController.(*process_ctrl.ProcessControllerImpl)
if !ok {
return fmt.Errorf("ProcessController is not an instance of
ProcessControllerImpl")
}
- if pcImpl.businessProcessor == nil {
+ if pcImpl.BusinessProcessor() == nil {
return fmt.Errorf("BusinessProcessor in ProcessController is
not initialized")
}
- processCtrlConsumer := &ProcessCtrlEventConsumer{
- processController: c.processController,
- }
+ consumer :=
process_ctrl.NewProcessCtrlEventConsumer(c.processController)
- c.syncEventBus.RegisterEventConsumer(processCtrlConsumer)
- c.asyncEventBus.RegisterEventConsumer(processCtrlConsumer)
+ c.syncEventBus.RegisterEventConsumer(consumer)
+ c.asyncEventBus.RegisterEventConsumer(consumer)
return nil
}
@@ -587,45 +585,16 @@ func (c *DefaultStateMachineConfig)
EvaluateExpression(expressionStr string, con
return result, nil
}
-func NewDefaultBusinessProcessor() *DefaultBusinessProcessor {
- return &DefaultBusinessProcessor{
- processHandlers: make(map[string]ProcessHandler),
- routerHandlers: make(map[string]RouterHandler),
- }
-}
-
func NewDefaultStateMachineConfig(opts ...Option) *DefaultStateMachineConfig {
ctx := context.Background()
- defaultBP := NewDefaultBusinessProcessor()
-
- // stateMachineResources for development only; production uses env
vars/config files.
- stateMachineResources := []string{
- "testdata/saga/statelang/**/*.json",
- "testdata/saga/statelang/**/*.yaml",
- }
-
- if envPaths := os.Getenv("SEATA_STATE_MACHINE_RESOURCES"); envPaths !=
"" {
- paths := strings.Split(envPaths, ",")
- filteredPaths := make([]string, 0, len(paths))
- for _, p := range paths {
- p = strings.TrimSpace(p)
- if p != "" {
- filteredPaths = append(filteredPaths, p)
- }
- }
- if len(filteredPaths) > 0 {
- stateMachineResources = filteredPaths
- }
- }
+ defaultBP := process_ctrl.NewBusinessProcessor()
c := &DefaultStateMachineConfig{
- transOperationTimeout: DefaultTransOperTimeout,
- serviceInvokeTimeout: DefaultServiceInvokeTimeout,
- charset: "UTF-8",
- defaultTenantId: "000001",
-
- stateMachineResources: stateMachineResources,
-
+ transOperationTimeout: DefaultTransOperTimeout,
+ serviceInvokeTimeout: DefaultServiceInvokeTimeout,
+ charset: "UTF-8",
+ defaultTenantId: "000001",
+ stateMachineResources: parseEnvResources(),
sagaRetryPersistModeUpdate:
DefaultClientSagaRetryPersistModeUpdate,
sagaCompensatePersistModeUpdate:
DefaultClientSagaCompensatePersistModeUpdate,
sagaBranchRegisterEnable:
DefaultClientSagaBranchRegisterEnable,
@@ -633,21 +602,31 @@ func NewDefaultStateMachineConfig(opts ...Option)
*DefaultStateMachineConfig {
stateMachineDefs:
make(map[string]*statemachine.StateMachineObject),
componentLock: &sync.Mutex{},
seqGenerator: sequence.NewUUIDSeqGenerator(),
-
- statusDecisionStrategy: NewDefaultStatusDecisionStrategy(),
- processController: &ProcessControllerImpl{
- businessProcessor: defaultBP,
- },
-
- syncEventBus: NewDirectEventBus(),
- asyncEventBus: NewAsyncEventBus(ctx, 1000, 5),
+ statusDecisionStrategy:
strategy.NewDefaultStatusDecisionStrategy(),
+ processController: func() process_ctrl.ProcessController {
+ pc := &process_ctrl.ProcessControllerImpl{}
+ pc.SetBusinessProcessor(defaultBP)
+ return pc
+ }(),
+ syncEventBus: process_ctrl.NewDirectEventBus(),
+ asyncEventBus: process_ctrl.NewAsyncEventBus(ctx, 1000, 5),
syncProcessCtrlEventPublisher: nil,
asyncProcessCtrlEventPublisher: nil,
+
+ stateLogStore: &NoopStateLogStore{},
+ stateLangStore: &NoopStateLangStore{},
}
- c.syncProcessCtrlEventPublisher =
NewProcessCtrlEventPublisher(c.syncEventBus)
- c.asyncProcessCtrlEventPublisher =
NewProcessCtrlEventPublisher(c.asyncEventBus)
+ c.stateMachineRepository = repository.GetStateMachineRepositoryImpl()
+ c.stateLogRepository = repository.NewStateLogRepositoryImpl()
+
+ c.syncProcessCtrlEventPublisher =
process_ctrl.NewProcessCtrlEventPublisher(c.syncEventBus)
+ c.asyncProcessCtrlEventPublisher =
process_ctrl.NewProcessCtrlEventPublisher(c.asyncEventBus)
+
+ for _, opt := range opts {
+ opt(c)
+ }
if err := c.LoadConfig("config.yaml"); err == nil {
log.Printf("Successfully loaded config from config.yaml")
@@ -655,16 +634,26 @@ func NewDefaultStateMachineConfig(opts ...Option)
*DefaultStateMachineConfig {
log.Printf("Failed to load config file (using default/env
values): %v", err)
}
- for _, opt := range opts {
- opt(c)
- }
-
return c
}
+func parseEnvResources() []string {
+ if env := os.Getenv("SEATA_STATE_MACHINE_RESOURCES"); env != "" {
+ parts := strings.Split(env, ",")
+ var res []string
+ for _, p := range parts {
+ if p = strings.TrimSpace(p); p != "" {
+ res = append(res, p)
+ }
+ }
+ return res
+ }
+ return nil
+}
+
type Option func(*DefaultStateMachineConfig)
-func WithStatusDecisionStrategy(strategy StatusDecisionStrategy) Option {
+func WithStatusDecisionStrategy(strategy engine.StatusDecisionStrategy) Option
{
return func(c *DefaultStateMachineConfig) {
c.statusDecisionStrategy = strategy
}
@@ -676,15 +665,19 @@ func WithSeqGenerator(gen sequence.SeqGenerator) Option {
}
}
-func WithProcessController(ctrl ProcessController) Option {
+func WithProcessController(ctrl process_ctrl.ProcessController) Option {
return func(c *DefaultStateMachineConfig) {
c.processController = ctrl
}
}
-func WithBusinessProcessor(bp BusinessProcessor) Option {
+func WithBusinessProcessor(bp process_ctrl.BusinessProcessor) Option {
return func(c *DefaultStateMachineConfig) {
- c.processController.(*ProcessControllerImpl).businessProcessor
= bp
+ if pc, ok :=
c.processController.(*process_ctrl.ProcessControllerImpl); ok {
+ pc.SetBusinessProcessor(bp)
+ } else {
+ log.Printf("ProcessController is not of type
*ProcessControllerImpl, unable to set BusinessProcessor")
+ }
}
}
@@ -696,25 +689,25 @@ func WithStateMachineResources(paths []string) Option {
}
}
-func WithStateLogRepository(logRepo StateLogRepository) Option {
+func WithStateLogRepository(logRepo repo.StateLogRepository) Option {
return func(c *DefaultStateMachineConfig) {
c.stateLogRepository = logRepo
}
}
-func WithStateLogStore(logStore StateLogStore) Option {
+func WithStateLogStore(logStore store.StateLogStore) Option {
return func(c *DefaultStateMachineConfig) {
c.stateLogStore = logStore
}
}
-func WithStateLangStore(langStore StateLangStore) Option {
+func WithStateLangStore(langStore store.StateLangStore) Option {
return func(c *DefaultStateMachineConfig) {
c.stateLangStore = langStore
}
}
-func WithStateMachineRepository(machineRepo StateMachineRepository) Option {
+func WithStateMachineRepository(machineRepo repo.StateMachineRepository)
Option {
return func(c *DefaultStateMachineConfig) {
c.stateMachineRepository = machineRepo
}
diff --git
a/pkg/saga/statemachine/engine/config/default_statemachine_config_test.go
b/pkg/saga/statemachine/engine/config/default_statemachine_config_test.go
index 82e5c427..f3ad80ca 100644
--- a/pkg/saga/statemachine/engine/config/default_statemachine_config_test.go
+++ b/pkg/saga/statemachine/engine/config/default_statemachine_config_test.go
@@ -20,6 +20,7 @@ package config
import (
"errors"
"io"
+ "os"
"path/filepath"
"testing"
@@ -28,6 +29,8 @@ import (
)
func TestDefaultStateMachineConfig_LoadValidJSON(t *testing.T) {
+ os.Unsetenv("SEATA_STATE_MACHINE_RESOURCES")
+
config := NewDefaultStateMachineConfig()
testFile := filepath.Join("testdata", "order_saga.json")
@@ -43,6 +46,8 @@ func TestDefaultStateMachineConfig_LoadValidJSON(t
*testing.T) {
}
func TestDefaultStateMachineConfig_LoadValidYAML(t *testing.T) {
+ os.Unsetenv("SEATA_STATE_MACHINE_RESOURCES")
+
config := NewDefaultStateMachineConfig()
testFile := filepath.Join("testdata", "order_saga.yaml")
@@ -54,6 +59,8 @@ func TestDefaultStateMachineConfig_LoadValidYAML(t
*testing.T) {
}
func TestLoadNonExistentFile(t *testing.T) {
+ os.Unsetenv("SEATA_STATE_MACHINE_RESOURCES")
+
config := NewDefaultStateMachineConfig()
err := config.LoadConfig("non_existent.json")
assert.Error(t, err, "Loading a non-existent file should report an
error")
@@ -61,6 +68,8 @@ func TestLoadNonExistentFile(t *testing.T) {
}
func TestGetStateMachineDefinition_Exists(t *testing.T) {
+ os.Unsetenv("SEATA_STATE_MACHINE_RESOURCES")
+
config := NewDefaultStateMachineConfig()
_ = config.LoadConfig(filepath.Join("testdata", "order_saga.json"))
@@ -70,12 +79,16 @@ func TestGetStateMachineDefinition_Exists(t *testing.T) {
}
func TestGetNonExistentStateMachine(t *testing.T) {
+ os.Unsetenv("SEATA_STATE_MACHINE_RESOURCES")
+
config := NewDefaultStateMachineConfig()
smo := config.GetStateMachineDefinition("NonExistent")
assert.Nil(t, smo, "An unloaded state machine should return nil")
}
func TestLoadDuplicateStateMachine(t *testing.T) {
+ os.Unsetenv("SEATA_STATE_MACHINE_RESOURCES")
+
config := NewDefaultStateMachineConfig()
testFile := filepath.Join("testdata", "order_saga.json")
@@ -88,6 +101,8 @@ func TestLoadDuplicateStateMachine(t *testing.T) {
}
func TestRuntimeConfig_OverrideDefaults(t *testing.T) {
+ os.Unsetenv("SEATA_STATE_MACHINE_RESOURCES")
+
config := NewDefaultStateMachineConfig()
assert.Equal(t, "UTF-8", config.charset, "The default character set
should be UTF-8")
@@ -102,6 +117,8 @@ func TestRuntimeConfig_OverrideDefaults(t *testing.T) {
}
func TestGetDefaultExpressionFactory(t *testing.T) {
+ os.Unsetenv("SEATA_STATE_MACHINE_RESOURCES")
+
config := NewDefaultStateMachineConfig()
err := config.Init()
@@ -115,6 +132,8 @@ func TestGetDefaultExpressionFactory(t *testing.T) {
}
func TestGetServiceInvoker(t *testing.T) {
+ os.Unsetenv("SEATA_STATE_MACHINE_RESOURCES")
+
config := NewDefaultStateMachineConfig()
if err := config.Init(); err != nil {
t.Fatalf("init config failed: %v", err)
@@ -127,6 +146,8 @@ func TestGetServiceInvoker(t *testing.T) {
}
func TestLoadConfig_InvalidJSON(t *testing.T) {
+ os.Unsetenv("SEATA_STATE_MACHINE_RESOURCES")
+
config := NewDefaultStateMachineConfig()
testFile := filepath.Join("testdata", "invalid.json")
@@ -136,6 +157,8 @@ func TestLoadConfig_InvalidJSON(t *testing.T) {
}
func TestLoadConfig_InvalidYAML(t *testing.T) {
+ os.Unsetenv("SEATA_STATE_MACHINE_RESOURCES")
+
config := NewDefaultStateMachineConfig()
testFile := filepath.Join("testdata", "invalid.yaml")
@@ -145,6 +168,8 @@ func TestLoadConfig_InvalidYAML(t *testing.T) {
}
func TestRegisterStateMachineDef_Fail(t *testing.T) {
+ os.Unsetenv("SEATA_STATE_MACHINE_RESOURCES")
+
config := NewDefaultStateMachineConfig()
invalidResource := []string{"invalid_path.json"}
@@ -154,6 +179,8 @@ func TestRegisterStateMachineDef_Fail(t *testing.T) {
}
func TestInit_ExpressionFactoryManagerNil(t *testing.T) {
+ os.Unsetenv("SEATA_STATE_MACHINE_RESOURCES")
+
config := NewDefaultStateMachineConfig()
config.expressionFactoryManager = nil
@@ -162,6 +189,8 @@ func TestInit_ExpressionFactoryManagerNil(t *testing.T) {
}
func TestInit_ServiceInvokerManagerNil(t *testing.T) {
+ os.Unsetenv("SEATA_STATE_MACHINE_RESOURCES")
+
config := NewDefaultStateMachineConfig()
config.serviceInvokerManager = nil
@@ -170,6 +199,8 @@ func TestInit_ServiceInvokerManagerNil(t *testing.T) {
}
func TestInit_StateMachineRepositoryNil(t *testing.T) {
+ os.Unsetenv("SEATA_STATE_MACHINE_RESOURCES")
+
config := NewDefaultStateMachineConfig()
config.stateMachineRepository = nil
@@ -178,6 +209,8 @@ func TestInit_StateMachineRepositoryNil(t *testing.T) {
}
func TestApplyRuntimeConfig_BoundaryValues(t *testing.T) {
+ os.Unsetenv("SEATA_STATE_MACHINE_RESOURCES")
+
config := NewDefaultStateMachineConfig()
customConfig := &ConfigFileParams{
TransOperationTimeout: 1,
@@ -220,6 +253,8 @@ func (m *TestStateMachineRepositoryMock)
RegistryStateMachineByReader(reader io.
}
func TestRegisterStateMachineDef_RepositoryError(t *testing.T) {
+ os.Unsetenv("SEATA_STATE_MACHINE_RESOURCES")
+
config := NewDefaultStateMachineConfig()
config.stateMachineRepository = &TestStateMachineRepositoryMock{}
resource := []string{filepath.Join("testdata", "order_saga.json")}
diff --git a/pkg/saga/statemachine/engine/config/noop_store.go
b/pkg/saga/statemachine/engine/config/noop_store.go
new file mode 100644
index 00000000..f40f6278
--- /dev/null
+++ b/pkg/saga/statemachine/engine/config/noop_store.go
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package config
+
+import (
+ "context"
+ "github.com/seata/seata-go/pkg/saga/statemachine/process_ctrl"
+ "github.com/seata/seata-go/pkg/saga/statemachine/statelang"
+)
+
+// NoopStateLogStore is a no-op implementation of StateLogStore for
out-of-the-box scenarios.
+// All methods perform no actual operations and return nil or zero values to
ensure validation passes.
+type NoopStateLogStore struct{}
+
+func (s *NoopStateLogStore) RecordStateMachineStarted(ctx context.Context,
machineInstance statelang.StateMachineInstance, pc process_ctrl.ProcessContext)
error {
+ return nil
+}
+
+func (s *NoopStateLogStore) RecordStateMachineFinished(ctx context.Context,
machineInstance statelang.StateMachineInstance, pc process_ctrl.ProcessContext)
error {
+ return nil
+}
+
+func (s *NoopStateLogStore) RecordStateMachineRestarted(ctx context.Context,
machineInstance statelang.StateMachineInstance, pc process_ctrl.ProcessContext)
error {
+ return nil
+}
+
+func (s *NoopStateLogStore) RecordStateStarted(ctx context.Context,
stateInstance statelang.StateInstance, pc process_ctrl.ProcessContext) error {
+ return nil
+}
+
+func (s *NoopStateLogStore) RecordStateFinished(ctx context.Context,
stateInstance statelang.StateInstance, pc process_ctrl.ProcessContext) error {
+ return nil
+}
+
+func (s *NoopStateLogStore) GetStateMachineInstance(stateMachineInstanceId
string) (statelang.StateMachineInstance, error) {
+ return nil, nil
+}
+
+func (s *NoopStateLogStore) GetStateMachineInstanceByBusinessKey(businessKey
string, tenantId string) (statelang.StateMachineInstance, error) {
+ return nil, nil
+}
+
+func (s *NoopStateLogStore) GetStateMachineInstanceByParentId(parentId string)
([]statelang.StateMachineInstance, error) {
+ return nil, nil
+}
+
+func (s *NoopStateLogStore) GetStateInstance(stateInstanceId string,
stateMachineInstanceId string) (statelang.StateInstance, error) {
+ return nil, nil
+}
+
+func (s *NoopStateLogStore)
GetStateInstanceListByMachineInstanceId(stateMachineInstanceId string)
([]statelang.StateInstance, error) {
+ return nil, nil
+}
+
+func (s *NoopStateLogStore) ClearUp(pc process_ctrl.ProcessContext) {
+ // no-op
+}
+
+type NoopStateLangStore struct{}
+
+func (s *NoopStateLangStore) GetStateMachineById(stateMachineId string)
(statelang.StateMachine, error) {
+ return nil, nil
+}
+
+func (s *NoopStateLangStore) GetLastVersionStateMachine(stateMachineName
string, tenantId string) (statelang.StateMachine, error) {
+ return nil, nil
+}
+
+func (s *NoopStateLangStore) StoreStateMachine(stateMachine
statelang.StateMachine) error {
+ return nil
+}
diff --git a/pkg/saga/statemachine/engine/core/testdata/invalid.json
b/pkg/saga/statemachine/engine/config/testdata/invalid.json
similarity index 100%
rename from pkg/saga/statemachine/engine/core/testdata/invalid.json
rename to pkg/saga/statemachine/engine/config/testdata/invalid.json
diff --git a/pkg/saga/statemachine/engine/core/testdata/invalid.json.comment
b/pkg/saga/statemachine/engine/config/testdata/invalid.json.comment
similarity index 100%
rename from pkg/saga/statemachine/engine/core/testdata/invalid.json.comment
rename to pkg/saga/statemachine/engine/config/testdata/invalid.json.comment
diff --git a/pkg/saga/statemachine/engine/core/testdata/invalid.yaml
b/pkg/saga/statemachine/engine/config/testdata/invalid.yaml
similarity index 100%
rename from pkg/saga/statemachine/engine/core/testdata/invalid.yaml
rename to pkg/saga/statemachine/engine/config/testdata/invalid.yaml
diff --git a/pkg/saga/statemachine/engine/core/testdata/order_saga.json
b/pkg/saga/statemachine/engine/config/testdata/order_saga.json
similarity index 100%
rename from pkg/saga/statemachine/engine/core/testdata/order_saga.json
rename to pkg/saga/statemachine/engine/config/testdata/order_saga.json
diff --git a/pkg/saga/statemachine/engine/core/testdata/order_saga.json.comment
b/pkg/saga/statemachine/engine/config/testdata/order_saga.json.comment
similarity index 100%
rename from pkg/saga/statemachine/engine/core/testdata/order_saga.json.comment
rename to pkg/saga/statemachine/engine/config/testdata/order_saga.json.comment
diff --git a/pkg/saga/statemachine/engine/core/testdata/order_saga.yaml
b/pkg/saga/statemachine/engine/config/testdata/order_saga.yaml
similarity index 100%
rename from pkg/saga/statemachine/engine/core/testdata/order_saga.yaml
rename to pkg/saga/statemachine/engine/config/testdata/order_saga.yaml
diff --git
a/pkg/saga/statemachine/engine/core/process_ctrl_statemachine_engine.go
b/pkg/saga/statemachine/engine/core/process_ctrl_statemachine_engine.go
index a3e64b2a..cc51b552 100644
--- a/pkg/saga/statemachine/engine/core/process_ctrl_statemachine_engine.go
+++ b/pkg/saga/statemachine/engine/core/process_ctrl_statemachine_engine.go
@@ -142,7 +142,7 @@ func (p ProcessCtrlStateMachineEngine)
ReloadStateMachineInstance(ctx context.Co
func (p ProcessCtrlStateMachineEngine) startInternal(ctx context.Context,
stateMachineName string, tenantId string,
businessKey string, startParams map[string]interface{}, async bool,
callback engine.CallBack) (statelang.StateMachineInstance, error) {
if tenantId == "" {
- tenantId = p.StateMachineConfig.DefaultTenantId()
+ tenantId = p.StateMachineConfig.GetDefaultTenantId()
}
stateMachineInstance, err := p.createMachineInstance(stateMachineName,
tenantId, businessKey, startParams)
@@ -286,7 +286,7 @@ func (p ProcessCtrlStateMachineEngine) forwardInternal(ctx
context.Context, stat
}
inst.SetStateName(next)
} else {
- if lastForwardState.Status() == statelang.RU &&
!pcext.IsTimeout(lastForwardState.StartedTime(),
p.StateMachineConfig.ServiceInvokeTimeout()) {
+ if lastForwardState.Status() == statelang.RU &&
!pcext.IsTimeout(lastForwardState.StartedTime(),
p.StateMachineConfig.GetServiceInvokeTimeout()) {
return nil,
exception.NewEngineExecutionException(seataErrors.OperationDenied,
fmt.Sprintf("State [%s] is running,
operation[forward] denied", lastForwardState.Name()), nil)
}
@@ -610,7 +610,7 @@ func (p ProcessCtrlStateMachineEngine) checkStatus(ctx
context.Context, stateMac
}
if stateMachineInstance.IsRunning() &&
- !pcext.IsTimeout(stateMachineInstance.UpdatedTime(),
p.StateMachineConfig.TransOperationTimeout()) {
+ !pcext.IsTimeout(stateMachineInstance.UpdatedTime(),
p.StateMachineConfig.GetTransOperationTimeout()) {
return false,
exception.NewEngineExecutionException(seataErrors.OperationDenied,
"StateMachineInstance
[id:"+stateMachineInstance.ID()+"] is running, operation["+operation+
"] denied", nil)
diff --git
a/pkg/saga/statemachine/engine/repo/repository/state_log_repository.go
b/pkg/saga/statemachine/engine/repo/repository/state_log_repository.go
index c9c10480..316e7c2a 100644
--- a/pkg/saga/statemachine/engine/repo/repository/state_log_repository.go
+++ b/pkg/saga/statemachine/engine/repo/repository/state_log_repository.go
@@ -23,22 +23,22 @@ import (
"github.com/seata/seata-go/pkg/saga/statemachine/process_ctrl"
"github.com/seata/seata-go/pkg/saga/statemachine/statelang"
"github.com/seata/seata-go/pkg/saga/statemachine/store"
+ "sync"
)
var (
- stateLogRepositoryImpl *StateLogRepositoryImpl
+ stateLogRepositoryImpl *StateLogRepositoryImpl
+ onceStateLogRepositoryImpl sync.Once
)
type StateLogRepositoryImpl struct {
stateLogStore store.StateLogStore
}
-func NewStateLogRepositoryImpl(stateLogStore store.StateLogStore)
*StateLogRepositoryImpl {
- if stateLogRepositoryImpl == nil {
- stateLogRepositoryImpl = &StateLogRepositoryImpl{
- stateLogStore: stateLogStore,
- }
- }
+func NewStateLogRepositoryImpl() *StateLogRepositoryImpl {
+ onceStateLogRepositoryImpl.Do(func() {
+ stateLogRepositoryImpl = &StateLogRepositoryImpl{}
+ })
return stateLogRepositoryImpl
}
@@ -125,7 +125,6 @@ func (s *StateLogRepositoryImpl)
GetStateInstance(stateInstanceId, machineInstId
return s.stateLogStore.GetStateInstance(stateInstanceId, machineInstId)
}
-
func (s *StateLogRepositoryImpl)
GetStateInstanceListByMachineInstanceId(stateMachineInstanceId string)
([]statelang.StateInstance, error) {
if s.stateLogStore == nil {
return nil, errors.New("stateLogStore is not initialized")
diff --git a/pkg/saga/statemachine/process_ctrl/bussiness_processor.go
b/pkg/saga/statemachine/process_ctrl/bussiness_processor.go
index 5c830d21..369972a1 100644
--- a/pkg/saga/statemachine/process_ctrl/bussiness_processor.go
+++ b/pkg/saga/statemachine/process_ctrl/bussiness_processor.go
@@ -31,6 +31,13 @@ type BusinessProcessor interface {
Route(ctx context.Context, processContext ProcessContext) error
}
+func NewBusinessProcessor() BusinessProcessor {
+ return &DefaultBusinessProcessor{
+ processHandlers: make(map[string]ProcessHandler),
+ routerHandlers: make(map[string]RouterHandler),
+ }
+}
+
type DefaultBusinessProcessor struct {
processHandlers map[string]ProcessHandler
routerHandlers map[string]RouterHandler
diff --git a/pkg/saga/statemachine/process_ctrl/event_consumer.go
b/pkg/saga/statemachine/process_ctrl/event_consumer.go
index 68dd90ae..8672624f 100644
--- a/pkg/saga/statemachine/process_ctrl/event_consumer.go
+++ b/pkg/saga/statemachine/process_ctrl/event_consumer.go
@@ -43,8 +43,14 @@ func (p ProcessCtrlEventConsumer) Accept(event Event) bool {
func (p ProcessCtrlEventConsumer) Process(ctx context.Context, event Event)
error {
processContext, ok := event.(ProcessContext)
- if !ok {
+ if !ok {
return fmt.Errorf("event %T is illegal, required
process_ctrl.ProcessContext", event)
}
return p.processController.Process(ctx, processContext)
}
+
+func NewProcessCtrlEventConsumer(controller ProcessController) EventConsumer {
+ return &ProcessCtrlEventConsumer{
+ processController: controller,
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]