This is an automated email from the ASF dual-hosted git repository.

zfeng pushed a commit to branch feature/saga
in repository https://gitbox.apache.org/repos/asf/incubator-seata-go.git


The following commit(s) were added to refs/heads/feature/saga by this push:
     new 17237abd Resolve Merge Conflicts, Add Persistence Initialization, and 
Optimize Tests (#847)
17237abd is described below

commit 17237abd3a2804be86a69908982186747526a30b
Author: flypiggy <[email protected]>
AuthorDate: Sat Jul 5 15:54:48 2025 +0800

    Resolve Merge Conflicts, Add Persistence Initialization, and Optimize Tests 
(#847)
    
    * fix-conflict&add-store-component-init&fix-test
    
    * Update process_ctrl_statemachine_engine.go
---
 .../engine/config/default_statemachine_config.go   | 147 ++++++++++-----------
 .../config/default_statemachine_config_test.go     |  35 +++++
 pkg/saga/statemachine/engine/config/noop_store.go  |  86 ++++++++++++
 .../engine/{core => config}/testdata/invalid.json  |   0
 .../{core => config}/testdata/invalid.json.comment |   0
 .../engine/{core => config}/testdata/invalid.yaml  |   0
 .../{core => config}/testdata/order_saga.json      |   0
 .../testdata/order_saga.json.comment               |   0
 .../{core => config}/testdata/order_saga.yaml      |   0
 .../core/process_ctrl_statemachine_engine.go       |   6 +-
 .../engine/repo/repository/state_log_repository.go |  15 +--
 .../process_ctrl/bussiness_processor.go            |   7 +
 .../statemachine/process_ctrl/event_consumer.go    |   8 +-
 13 files changed, 215 insertions(+), 89 deletions(-)

diff --git a/pkg/saga/statemachine/engine/config/default_statemachine_config.go 
b/pkg/saga/statemachine/engine/config/default_statemachine_config.go
index 78943e74..07e4f953 100644
--- a/pkg/saga/statemachine/engine/config/default_statemachine_config.go
+++ b/pkg/saga/statemachine/engine/config/default_statemachine_config.go
@@ -21,14 +21,15 @@ import (
        "context"
        "encoding/json"
        "fmt"
+       "github.com/seata/seata-go/pkg/saga/statemachine/engine/repo/repository"
+       "github.com/seata/seata-go/pkg/saga/statemachine/engine/strategy"
+       "gopkg.in/yaml.v3"
        "log"
        "os"
        "path/filepath"
        "strings"
        "sync"
 
-       "gopkg.in/yaml.v3"
-
        "github.com/seata/seata-go/pkg/saga/statemachine"
        "github.com/seata/seata-go/pkg/saga/statemachine/engine"
        "github.com/seata/seata-go/pkg/saga/statemachine/engine/expr"
@@ -36,9 +37,8 @@ import (
        "github.com/seata/seata-go/pkg/saga/statemachine/engine/repo"
        "github.com/seata/seata-go/pkg/saga/statemachine/engine/sequence"
        "github.com/seata/seata-go/pkg/saga/statemachine/process_ctrl"
-       "github.com/seata/seata-go/pkg/saga/statemachine/store"
-       "sync"
        "github.com/seata/seata-go/pkg/saga/statemachine/statelang/parser"
+       "github.com/seata/seata-go/pkg/saga/statemachine/store"
 )
 
 const (
@@ -66,11 +66,11 @@ type DefaultStateMachineConfig struct {
        stateMachineDefs map[string]*statemachine.StateMachineObject
 
        // Components
-       processController ProcessController
+       processController process_ctrl.ProcessController
 
        // Event Bus
-       syncEventBus  EventBus
-       asyncEventBus EventBus
+       syncEventBus  process_ctrl.EventBus
+       asyncEventBus process_ctrl.EventBus
 
        // Event publisher
        syncProcessCtrlEventPublisher  process_ctrl.EventPublisher
@@ -120,15 +120,15 @@ func (c *DefaultStateMachineConfig) 
SetDefaultTenantId(defaultTenantId string) {
        c.defaultTenantId = defaultTenantId
 }
 
-func (c *DefaultStateMachineConfig) SetSyncEventBus(syncEventBus EventBus) {
+func (c *DefaultStateMachineConfig) SetSyncEventBus(syncEventBus 
process_ctrl.EventBus) {
        c.syncEventBus = syncEventBus
 }
 
-func (c *DefaultStateMachineConfig) SetAsyncEventBus(asyncEventBus EventBus) {
+func (c *DefaultStateMachineConfig) SetAsyncEventBus(asyncEventBus 
process_ctrl.EventBus) {
        c.asyncEventBus = asyncEventBus
 }
 
-func (c *DefaultStateMachineConfig) 
SetSyncProcessCtrlEventPublisher(syncProcessCtrlEventPublisher EventPublisher) {
+func (c *DefaultStateMachineConfig) 
SetSyncProcessCtrlEventPublisher(syncProcessCtrlEventPublisher 
process_ctrl.EventPublisher) {
        c.syncProcessCtrlEventPublisher = syncProcessCtrlEventPublisher
 }
 
@@ -208,15 +208,15 @@ func (c *DefaultStateMachineConfig) 
StatusDecisionStrategy() engine.StatusDecisi
        return c.statusDecisionStrategy
 }
 
-func (c *DefaultStateMachineConfig) SyncEventBus() EventBus {
+func (c *DefaultStateMachineConfig) SyncEventBus() process_ctrl.EventBus {
        return c.syncEventBus
 }
 
-func (c *DefaultStateMachineConfig) AsyncEventBus() EventBus {
+func (c *DefaultStateMachineConfig) AsyncEventBus() process_ctrl.EventBus {
        return c.asyncEventBus
 }
 
-func (c *DefaultStateMachineConfig) EventPublisher() EventPublisher {
+func (c *DefaultStateMachineConfig) EventPublisher() 
process_ctrl.EventPublisher {
        return c.syncProcessCtrlEventPublisher
 }
 
@@ -407,21 +407,19 @@ func (c *DefaultStateMachineConfig) 
registerEventConsumers() error {
                return fmt.Errorf("ProcessController is not initialized")
        }
 
-       pcImpl, ok := c.processController.(*ProcessControllerImpl)
+       pcImpl, ok := c.processController.(*process_ctrl.ProcessControllerImpl)
        if !ok {
                return fmt.Errorf("ProcessController is not an instance of 
ProcessControllerImpl")
        }
 
-       if pcImpl.businessProcessor == nil {
+       if pcImpl.BusinessProcessor() == nil {
                return fmt.Errorf("BusinessProcessor in ProcessController is 
not initialized")
        }
 
-       processCtrlConsumer := &ProcessCtrlEventConsumer{
-               processController: c.processController,
-       }
+       consumer := 
process_ctrl.NewProcessCtrlEventConsumer(c.processController)
 
-       c.syncEventBus.RegisterEventConsumer(processCtrlConsumer)
-       c.asyncEventBus.RegisterEventConsumer(processCtrlConsumer)
+       c.syncEventBus.RegisterEventConsumer(consumer)
+       c.asyncEventBus.RegisterEventConsumer(consumer)
 
        return nil
 }
@@ -587,45 +585,16 @@ func (c *DefaultStateMachineConfig) 
EvaluateExpression(expressionStr string, con
        return result, nil
 }
 
-func NewDefaultBusinessProcessor() *DefaultBusinessProcessor {
-       return &DefaultBusinessProcessor{
-               processHandlers: make(map[string]ProcessHandler),
-               routerHandlers:  make(map[string]RouterHandler),
-       }
-}
-
 func NewDefaultStateMachineConfig(opts ...Option) *DefaultStateMachineConfig {
        ctx := context.Background()
-       defaultBP := NewDefaultBusinessProcessor()
-
-       // stateMachineResources for development only; production uses env 
vars/config files.
-       stateMachineResources := []string{
-               "testdata/saga/statelang/**/*.json",
-               "testdata/saga/statelang/**/*.yaml",
-       }
-
-       if envPaths := os.Getenv("SEATA_STATE_MACHINE_RESOURCES"); envPaths != 
"" {
-               paths := strings.Split(envPaths, ",")
-               filteredPaths := make([]string, 0, len(paths))
-               for _, p := range paths {
-                       p = strings.TrimSpace(p)
-                       if p != "" {
-                               filteredPaths = append(filteredPaths, p)
-                       }
-               }
-               if len(filteredPaths) > 0 {
-                       stateMachineResources = filteredPaths
-               }
-       }
+       defaultBP := process_ctrl.NewBusinessProcessor()
 
        c := &DefaultStateMachineConfig{
-               transOperationTimeout: DefaultTransOperTimeout,
-               serviceInvokeTimeout:  DefaultServiceInvokeTimeout,
-               charset:               "UTF-8",
-               defaultTenantId:       "000001",
-
-               stateMachineResources: stateMachineResources,
-
+               transOperationTimeout:           DefaultTransOperTimeout,
+               serviceInvokeTimeout:            DefaultServiceInvokeTimeout,
+               charset:                         "UTF-8",
+               defaultTenantId:                 "000001",
+               stateMachineResources:           parseEnvResources(),
                sagaRetryPersistModeUpdate:      
DefaultClientSagaRetryPersistModeUpdate,
                sagaCompensatePersistModeUpdate: 
DefaultClientSagaCompensatePersistModeUpdate,
                sagaBranchRegisterEnable:        
DefaultClientSagaBranchRegisterEnable,
@@ -633,21 +602,31 @@ func NewDefaultStateMachineConfig(opts ...Option) 
*DefaultStateMachineConfig {
                stateMachineDefs:                
make(map[string]*statemachine.StateMachineObject),
                componentLock:                   &sync.Mutex{},
                seqGenerator:                    sequence.NewUUIDSeqGenerator(),
-
-               statusDecisionStrategy: NewDefaultStatusDecisionStrategy(),
-               processController: &ProcessControllerImpl{
-                       businessProcessor: defaultBP,
-               },
-
-               syncEventBus:  NewDirectEventBus(),
-               asyncEventBus: NewAsyncEventBus(ctx, 1000, 5),
+               statusDecisionStrategy:          
strategy.NewDefaultStatusDecisionStrategy(),
+               processController: func() process_ctrl.ProcessController {
+                       pc := &process_ctrl.ProcessControllerImpl{}
+                       pc.SetBusinessProcessor(defaultBP)
+                       return pc
+               }(),
+               syncEventBus:  process_ctrl.NewDirectEventBus(),
+               asyncEventBus: process_ctrl.NewAsyncEventBus(ctx, 1000, 5),
 
                syncProcessCtrlEventPublisher:  nil,
                asyncProcessCtrlEventPublisher: nil,
+
+               stateLogStore:  &NoopStateLogStore{},
+               stateLangStore: &NoopStateLangStore{},
        }
 
-       c.syncProcessCtrlEventPublisher = 
NewProcessCtrlEventPublisher(c.syncEventBus)
-       c.asyncProcessCtrlEventPublisher = 
NewProcessCtrlEventPublisher(c.asyncEventBus)
+       c.stateMachineRepository = repository.GetStateMachineRepositoryImpl()
+       c.stateLogRepository = repository.NewStateLogRepositoryImpl()
+
+       c.syncProcessCtrlEventPublisher = 
process_ctrl.NewProcessCtrlEventPublisher(c.syncEventBus)
+       c.asyncProcessCtrlEventPublisher = 
process_ctrl.NewProcessCtrlEventPublisher(c.asyncEventBus)
+
+       for _, opt := range opts {
+               opt(c)
+       }
 
        if err := c.LoadConfig("config.yaml"); err == nil {
                log.Printf("Successfully loaded config from config.yaml")
@@ -655,16 +634,26 @@ func NewDefaultStateMachineConfig(opts ...Option) 
*DefaultStateMachineConfig {
                log.Printf("Failed to load config file (using default/env 
values): %v", err)
        }
 
-       for _, opt := range opts {
-               opt(c)
-       }
-
        return c
 }
 
+func parseEnvResources() []string {
+       if env := os.Getenv("SEATA_STATE_MACHINE_RESOURCES"); env != "" {
+               parts := strings.Split(env, ",")
+               var res []string
+               for _, p := range parts {
+                       if p = strings.TrimSpace(p); p != "" {
+                               res = append(res, p)
+                       }
+               }
+               return res
+       }
+       return nil
+}
+
 type Option func(*DefaultStateMachineConfig)
 
-func WithStatusDecisionStrategy(strategy StatusDecisionStrategy) Option {
+func WithStatusDecisionStrategy(strategy engine.StatusDecisionStrategy) Option 
{
        return func(c *DefaultStateMachineConfig) {
                c.statusDecisionStrategy = strategy
        }
@@ -676,15 +665,19 @@ func WithSeqGenerator(gen sequence.SeqGenerator) Option {
        }
 }
 
-func WithProcessController(ctrl ProcessController) Option {
+func WithProcessController(ctrl process_ctrl.ProcessController) Option {
        return func(c *DefaultStateMachineConfig) {
                c.processController = ctrl
        }
 }
 
-func WithBusinessProcessor(bp BusinessProcessor) Option {
+func WithBusinessProcessor(bp process_ctrl.BusinessProcessor) Option {
        return func(c *DefaultStateMachineConfig) {
-               c.processController.(*ProcessControllerImpl).businessProcessor 
= bp
+               if pc, ok := 
c.processController.(*process_ctrl.ProcessControllerImpl); ok {
+                       pc.SetBusinessProcessor(bp)
+               } else {
+                       log.Printf("ProcessController is not of type 
*ProcessControllerImpl, unable to set BusinessProcessor")
+               }
        }
 }
 
@@ -696,25 +689,25 @@ func WithStateMachineResources(paths []string) Option {
        }
 }
 
-func WithStateLogRepository(logRepo StateLogRepository) Option {
+func WithStateLogRepository(logRepo repo.StateLogRepository) Option {
        return func(c *DefaultStateMachineConfig) {
                c.stateLogRepository = logRepo
        }
 }
 
-func WithStateLogStore(logStore StateLogStore) Option {
+func WithStateLogStore(logStore store.StateLogStore) Option {
        return func(c *DefaultStateMachineConfig) {
                c.stateLogStore = logStore
        }
 }
 
-func WithStateLangStore(langStore StateLangStore) Option {
+func WithStateLangStore(langStore store.StateLangStore) Option {
        return func(c *DefaultStateMachineConfig) {
                c.stateLangStore = langStore
        }
 }
 
-func WithStateMachineRepository(machineRepo StateMachineRepository) Option {
+func WithStateMachineRepository(machineRepo repo.StateMachineRepository) 
Option {
        return func(c *DefaultStateMachineConfig) {
                c.stateMachineRepository = machineRepo
        }
diff --git 
a/pkg/saga/statemachine/engine/config/default_statemachine_config_test.go 
b/pkg/saga/statemachine/engine/config/default_statemachine_config_test.go
index 82e5c427..f3ad80ca 100644
--- a/pkg/saga/statemachine/engine/config/default_statemachine_config_test.go
+++ b/pkg/saga/statemachine/engine/config/default_statemachine_config_test.go
@@ -20,6 +20,7 @@ package config
 import (
        "errors"
        "io"
+       "os"
        "path/filepath"
        "testing"
 
@@ -28,6 +29,8 @@ import (
 )
 
 func TestDefaultStateMachineConfig_LoadValidJSON(t *testing.T) {
+       os.Unsetenv("SEATA_STATE_MACHINE_RESOURCES")
+
        config := NewDefaultStateMachineConfig()
        testFile := filepath.Join("testdata", "order_saga.json")
 
@@ -43,6 +46,8 @@ func TestDefaultStateMachineConfig_LoadValidJSON(t 
*testing.T) {
 }
 
 func TestDefaultStateMachineConfig_LoadValidYAML(t *testing.T) {
+       os.Unsetenv("SEATA_STATE_MACHINE_RESOURCES")
+
        config := NewDefaultStateMachineConfig()
        testFile := filepath.Join("testdata", "order_saga.yaml")
 
@@ -54,6 +59,8 @@ func TestDefaultStateMachineConfig_LoadValidYAML(t 
*testing.T) {
 }
 
 func TestLoadNonExistentFile(t *testing.T) {
+       os.Unsetenv("SEATA_STATE_MACHINE_RESOURCES")
+
        config := NewDefaultStateMachineConfig()
        err := config.LoadConfig("non_existent.json")
        assert.Error(t, err, "Loading a non-existent file should report an 
error")
@@ -61,6 +68,8 @@ func TestLoadNonExistentFile(t *testing.T) {
 }
 
 func TestGetStateMachineDefinition_Exists(t *testing.T) {
+       os.Unsetenv("SEATA_STATE_MACHINE_RESOURCES")
+
        config := NewDefaultStateMachineConfig()
        _ = config.LoadConfig(filepath.Join("testdata", "order_saga.json"))
 
@@ -70,12 +79,16 @@ func TestGetStateMachineDefinition_Exists(t *testing.T) {
 }
 
 func TestGetNonExistentStateMachine(t *testing.T) {
+       os.Unsetenv("SEATA_STATE_MACHINE_RESOURCES")
+
        config := NewDefaultStateMachineConfig()
        smo := config.GetStateMachineDefinition("NonExistent")
        assert.Nil(t, smo, "An unloaded state machine should return nil")
 }
 
 func TestLoadDuplicateStateMachine(t *testing.T) {
+       os.Unsetenv("SEATA_STATE_MACHINE_RESOURCES")
+
        config := NewDefaultStateMachineConfig()
        testFile := filepath.Join("testdata", "order_saga.json")
 
@@ -88,6 +101,8 @@ func TestLoadDuplicateStateMachine(t *testing.T) {
 }
 
 func TestRuntimeConfig_OverrideDefaults(t *testing.T) {
+       os.Unsetenv("SEATA_STATE_MACHINE_RESOURCES")
+
        config := NewDefaultStateMachineConfig()
        assert.Equal(t, "UTF-8", config.charset, "The default character set 
should be UTF-8")
 
@@ -102,6 +117,8 @@ func TestRuntimeConfig_OverrideDefaults(t *testing.T) {
 }
 
 func TestGetDefaultExpressionFactory(t *testing.T) {
+       os.Unsetenv("SEATA_STATE_MACHINE_RESOURCES")
+
        config := NewDefaultStateMachineConfig()
 
        err := config.Init()
@@ -115,6 +132,8 @@ func TestGetDefaultExpressionFactory(t *testing.T) {
 }
 
 func TestGetServiceInvoker(t *testing.T) {
+       os.Unsetenv("SEATA_STATE_MACHINE_RESOURCES")
+
        config := NewDefaultStateMachineConfig()
        if err := config.Init(); err != nil {
                t.Fatalf("init config failed: %v", err)
@@ -127,6 +146,8 @@ func TestGetServiceInvoker(t *testing.T) {
 }
 
 func TestLoadConfig_InvalidJSON(t *testing.T) {
+       os.Unsetenv("SEATA_STATE_MACHINE_RESOURCES")
+
        config := NewDefaultStateMachineConfig()
        testFile := filepath.Join("testdata", "invalid.json")
 
@@ -136,6 +157,8 @@ func TestLoadConfig_InvalidJSON(t *testing.T) {
 }
 
 func TestLoadConfig_InvalidYAML(t *testing.T) {
+       os.Unsetenv("SEATA_STATE_MACHINE_RESOURCES")
+
        config := NewDefaultStateMachineConfig()
        testFile := filepath.Join("testdata", "invalid.yaml")
 
@@ -145,6 +168,8 @@ func TestLoadConfig_InvalidYAML(t *testing.T) {
 }
 
 func TestRegisterStateMachineDef_Fail(t *testing.T) {
+       os.Unsetenv("SEATA_STATE_MACHINE_RESOURCES")
+
        config := NewDefaultStateMachineConfig()
        invalidResource := []string{"invalid_path.json"}
 
@@ -154,6 +179,8 @@ func TestRegisterStateMachineDef_Fail(t *testing.T) {
 }
 
 func TestInit_ExpressionFactoryManagerNil(t *testing.T) {
+       os.Unsetenv("SEATA_STATE_MACHINE_RESOURCES")
+
        config := NewDefaultStateMachineConfig()
        config.expressionFactoryManager = nil
 
@@ -162,6 +189,8 @@ func TestInit_ExpressionFactoryManagerNil(t *testing.T) {
 }
 
 func TestInit_ServiceInvokerManagerNil(t *testing.T) {
+       os.Unsetenv("SEATA_STATE_MACHINE_RESOURCES")
+
        config := NewDefaultStateMachineConfig()
        config.serviceInvokerManager = nil
 
@@ -170,6 +199,8 @@ func TestInit_ServiceInvokerManagerNil(t *testing.T) {
 }
 
 func TestInit_StateMachineRepositoryNil(t *testing.T) {
+       os.Unsetenv("SEATA_STATE_MACHINE_RESOURCES")
+
        config := NewDefaultStateMachineConfig()
        config.stateMachineRepository = nil
 
@@ -178,6 +209,8 @@ func TestInit_StateMachineRepositoryNil(t *testing.T) {
 }
 
 func TestApplyRuntimeConfig_BoundaryValues(t *testing.T) {
+       os.Unsetenv("SEATA_STATE_MACHINE_RESOURCES")
+
        config := NewDefaultStateMachineConfig()
        customConfig := &ConfigFileParams{
                TransOperationTimeout: 1,
@@ -220,6 +253,8 @@ func (m *TestStateMachineRepositoryMock) 
RegistryStateMachineByReader(reader io.
 }
 
 func TestRegisterStateMachineDef_RepositoryError(t *testing.T) {
+       os.Unsetenv("SEATA_STATE_MACHINE_RESOURCES")
+
        config := NewDefaultStateMachineConfig()
        config.stateMachineRepository = &TestStateMachineRepositoryMock{}
        resource := []string{filepath.Join("testdata", "order_saga.json")}
diff --git a/pkg/saga/statemachine/engine/config/noop_store.go 
b/pkg/saga/statemachine/engine/config/noop_store.go
new file mode 100644
index 00000000..f40f6278
--- /dev/null
+++ b/pkg/saga/statemachine/engine/config/noop_store.go
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package config
+
+import (
+       "context"
+       "github.com/seata/seata-go/pkg/saga/statemachine/process_ctrl"
+       "github.com/seata/seata-go/pkg/saga/statemachine/statelang"
+)
+
+// NoopStateLogStore is a no-op implementation of StateLogStore for 
out-of-the-box scenarios.
+// All methods perform no actual operations and return nil or zero values to 
ensure validation passes.
+type NoopStateLogStore struct{}
+
+func (s *NoopStateLogStore) RecordStateMachineStarted(ctx context.Context, 
machineInstance statelang.StateMachineInstance, pc process_ctrl.ProcessContext) 
error {
+       return nil
+}
+
+func (s *NoopStateLogStore) RecordStateMachineFinished(ctx context.Context, 
machineInstance statelang.StateMachineInstance, pc process_ctrl.ProcessContext) 
error {
+       return nil
+}
+
+func (s *NoopStateLogStore) RecordStateMachineRestarted(ctx context.Context, 
machineInstance statelang.StateMachineInstance, pc process_ctrl.ProcessContext) 
error {
+       return nil
+}
+
+func (s *NoopStateLogStore) RecordStateStarted(ctx context.Context, 
stateInstance statelang.StateInstance, pc process_ctrl.ProcessContext) error {
+       return nil
+}
+
+func (s *NoopStateLogStore) RecordStateFinished(ctx context.Context, 
stateInstance statelang.StateInstance, pc process_ctrl.ProcessContext) error {
+       return nil
+}
+
+func (s *NoopStateLogStore) GetStateMachineInstance(stateMachineInstanceId 
string) (statelang.StateMachineInstance, error) {
+       return nil, nil
+}
+
+func (s *NoopStateLogStore) GetStateMachineInstanceByBusinessKey(businessKey 
string, tenantId string) (statelang.StateMachineInstance, error) {
+       return nil, nil
+}
+
+func (s *NoopStateLogStore) GetStateMachineInstanceByParentId(parentId string) 
([]statelang.StateMachineInstance, error) {
+       return nil, nil
+}
+
+func (s *NoopStateLogStore) GetStateInstance(stateInstanceId string, 
stateMachineInstanceId string) (statelang.StateInstance, error) {
+       return nil, nil
+}
+
+func (s *NoopStateLogStore) 
GetStateInstanceListByMachineInstanceId(stateMachineInstanceId string) 
([]statelang.StateInstance, error) {
+       return nil, nil
+}
+
+func (s *NoopStateLogStore) ClearUp(pc process_ctrl.ProcessContext) {
+       // no-op
+}
+
+type NoopStateLangStore struct{}
+
+func (s *NoopStateLangStore) GetStateMachineById(stateMachineId string) 
(statelang.StateMachine, error) {
+       return nil, nil
+}
+
+func (s *NoopStateLangStore) GetLastVersionStateMachine(stateMachineName 
string, tenantId string) (statelang.StateMachine, error) {
+       return nil, nil
+}
+
+func (s *NoopStateLangStore) StoreStateMachine(stateMachine 
statelang.StateMachine) error {
+       return nil
+}
diff --git a/pkg/saga/statemachine/engine/core/testdata/invalid.json 
b/pkg/saga/statemachine/engine/config/testdata/invalid.json
similarity index 100%
rename from pkg/saga/statemachine/engine/core/testdata/invalid.json
rename to pkg/saga/statemachine/engine/config/testdata/invalid.json
diff --git a/pkg/saga/statemachine/engine/core/testdata/invalid.json.comment 
b/pkg/saga/statemachine/engine/config/testdata/invalid.json.comment
similarity index 100%
rename from pkg/saga/statemachine/engine/core/testdata/invalid.json.comment
rename to pkg/saga/statemachine/engine/config/testdata/invalid.json.comment
diff --git a/pkg/saga/statemachine/engine/core/testdata/invalid.yaml 
b/pkg/saga/statemachine/engine/config/testdata/invalid.yaml
similarity index 100%
rename from pkg/saga/statemachine/engine/core/testdata/invalid.yaml
rename to pkg/saga/statemachine/engine/config/testdata/invalid.yaml
diff --git a/pkg/saga/statemachine/engine/core/testdata/order_saga.json 
b/pkg/saga/statemachine/engine/config/testdata/order_saga.json
similarity index 100%
rename from pkg/saga/statemachine/engine/core/testdata/order_saga.json
rename to pkg/saga/statemachine/engine/config/testdata/order_saga.json
diff --git a/pkg/saga/statemachine/engine/core/testdata/order_saga.json.comment 
b/pkg/saga/statemachine/engine/config/testdata/order_saga.json.comment
similarity index 100%
rename from pkg/saga/statemachine/engine/core/testdata/order_saga.json.comment
rename to pkg/saga/statemachine/engine/config/testdata/order_saga.json.comment
diff --git a/pkg/saga/statemachine/engine/core/testdata/order_saga.yaml 
b/pkg/saga/statemachine/engine/config/testdata/order_saga.yaml
similarity index 100%
rename from pkg/saga/statemachine/engine/core/testdata/order_saga.yaml
rename to pkg/saga/statemachine/engine/config/testdata/order_saga.yaml
diff --git 
a/pkg/saga/statemachine/engine/core/process_ctrl_statemachine_engine.go 
b/pkg/saga/statemachine/engine/core/process_ctrl_statemachine_engine.go
index a3e64b2a..cc51b552 100644
--- a/pkg/saga/statemachine/engine/core/process_ctrl_statemachine_engine.go
+++ b/pkg/saga/statemachine/engine/core/process_ctrl_statemachine_engine.go
@@ -142,7 +142,7 @@ func (p ProcessCtrlStateMachineEngine) 
ReloadStateMachineInstance(ctx context.Co
 func (p ProcessCtrlStateMachineEngine) startInternal(ctx context.Context, 
stateMachineName string, tenantId string,
        businessKey string, startParams map[string]interface{}, async bool, 
callback engine.CallBack) (statelang.StateMachineInstance, error) {
        if tenantId == "" {
-               tenantId = p.StateMachineConfig.DefaultTenantId()
+               tenantId = p.StateMachineConfig.GetDefaultTenantId()
        }
 
        stateMachineInstance, err := p.createMachineInstance(stateMachineName, 
tenantId, businessKey, startParams)
@@ -286,7 +286,7 @@ func (p ProcessCtrlStateMachineEngine) forwardInternal(ctx 
context.Context, stat
                }
                inst.SetStateName(next)
        } else {
-               if lastForwardState.Status() == statelang.RU && 
!pcext.IsTimeout(lastForwardState.StartedTime(), 
p.StateMachineConfig.ServiceInvokeTimeout()) {
+               if lastForwardState.Status() == statelang.RU && 
!pcext.IsTimeout(lastForwardState.StartedTime(), 
p.StateMachineConfig.GetServiceInvokeTimeout()) {
                        return nil, 
exception.NewEngineExecutionException(seataErrors.OperationDenied,
                                fmt.Sprintf("State [%s] is running, 
operation[forward] denied", lastForwardState.Name()), nil)
                }
@@ -610,7 +610,7 @@ func (p ProcessCtrlStateMachineEngine) checkStatus(ctx 
context.Context, stateMac
        }
 
        if stateMachineInstance.IsRunning() &&
-               !pcext.IsTimeout(stateMachineInstance.UpdatedTime(), 
p.StateMachineConfig.TransOperationTimeout()) {
+               !pcext.IsTimeout(stateMachineInstance.UpdatedTime(), 
p.StateMachineConfig.GetTransOperationTimeout()) {
                return false, 
exception.NewEngineExecutionException(seataErrors.OperationDenied,
                        "StateMachineInstance 
[id:"+stateMachineInstance.ID()+"] is running, operation["+operation+
                                "] denied", nil)
diff --git 
a/pkg/saga/statemachine/engine/repo/repository/state_log_repository.go 
b/pkg/saga/statemachine/engine/repo/repository/state_log_repository.go
index c9c10480..316e7c2a 100644
--- a/pkg/saga/statemachine/engine/repo/repository/state_log_repository.go
+++ b/pkg/saga/statemachine/engine/repo/repository/state_log_repository.go
@@ -23,22 +23,22 @@ import (
        "github.com/seata/seata-go/pkg/saga/statemachine/process_ctrl"
        "github.com/seata/seata-go/pkg/saga/statemachine/statelang"
        "github.com/seata/seata-go/pkg/saga/statemachine/store"
+       "sync"
 )
 
 var (
-       stateLogRepositoryImpl *StateLogRepositoryImpl
+       stateLogRepositoryImpl     *StateLogRepositoryImpl
+       onceStateLogRepositoryImpl sync.Once
 )
 
 type StateLogRepositoryImpl struct {
        stateLogStore store.StateLogStore
 }
 
-func NewStateLogRepositoryImpl(stateLogStore store.StateLogStore) 
*StateLogRepositoryImpl {
-       if stateLogRepositoryImpl == nil {
-               stateLogRepositoryImpl = &StateLogRepositoryImpl{
-                       stateLogStore: stateLogStore,
-               }
-       }
+func NewStateLogRepositoryImpl() *StateLogRepositoryImpl {
+       onceStateLogRepositoryImpl.Do(func() {
+               stateLogRepositoryImpl = &StateLogRepositoryImpl{}
+       })
        return stateLogRepositoryImpl
 }
 
@@ -125,7 +125,6 @@ func (s *StateLogRepositoryImpl) 
GetStateInstance(stateInstanceId, machineInstId
        return s.stateLogStore.GetStateInstance(stateInstanceId, machineInstId)
 }
 
-
 func (s *StateLogRepositoryImpl) 
GetStateInstanceListByMachineInstanceId(stateMachineInstanceId string) 
([]statelang.StateInstance, error) {
        if s.stateLogStore == nil {
                return nil, errors.New("stateLogStore is not initialized")
diff --git a/pkg/saga/statemachine/process_ctrl/bussiness_processor.go 
b/pkg/saga/statemachine/process_ctrl/bussiness_processor.go
index 5c830d21..369972a1 100644
--- a/pkg/saga/statemachine/process_ctrl/bussiness_processor.go
+++ b/pkg/saga/statemachine/process_ctrl/bussiness_processor.go
@@ -31,6 +31,13 @@ type BusinessProcessor interface {
        Route(ctx context.Context, processContext ProcessContext) error
 }
 
+func NewBusinessProcessor() BusinessProcessor {
+       return &DefaultBusinessProcessor{
+               processHandlers: make(map[string]ProcessHandler),
+               routerHandlers:  make(map[string]RouterHandler),
+       }
+}
+
 type DefaultBusinessProcessor struct {
        processHandlers map[string]ProcessHandler
        routerHandlers  map[string]RouterHandler
diff --git a/pkg/saga/statemachine/process_ctrl/event_consumer.go 
b/pkg/saga/statemachine/process_ctrl/event_consumer.go
index 68dd90ae..8672624f 100644
--- a/pkg/saga/statemachine/process_ctrl/event_consumer.go
+++ b/pkg/saga/statemachine/process_ctrl/event_consumer.go
@@ -43,8 +43,14 @@ func (p ProcessCtrlEventConsumer) Accept(event Event) bool {
 
 func (p ProcessCtrlEventConsumer) Process(ctx context.Context, event Event) 
error {
        processContext, ok := event.(ProcessContext)
-  if !ok {
+       if !ok {
                return fmt.Errorf("event %T is illegal, required 
process_ctrl.ProcessContext", event)
        }
        return p.processController.Process(ctx, processContext)
 }
+
+func NewProcessCtrlEventConsumer(controller ProcessController) EventConsumer {
+       return &ProcessCtrlEventConsumer{
+               processController: controller,
+       }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to