Copilot commented on code in PR #7865:
URL: https://github.com/apache/incubator-seata/pull/7865#discussion_r2670748469


##########
test-suite/seata-benchmark-cli/src/main/java/org/apache/seata/benchmark/executor/ATModeExecutor.java:
##########
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.benchmark.executor;
+
+import com.zaxxer.hikari.HikariConfig;
+import com.zaxxer.hikari.HikariDataSource;
+import org.apache.seata.benchmark.config.BenchmarkConfig;
+import org.apache.seata.benchmark.constant.BenchmarkConstants;
+import org.apache.seata.rm.datasource.DataSourceProxy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.MySQLContainer;
+import org.testcontainers.utility.DockerImageName;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * AT mode transaction executor supporting both empty and real transaction 
modes
+ * - branches == 0: Empty transaction mode (pure Seata protocol overhead 
testing)
+ * - branches > 0: Real mode with MySQL database (via Testcontainers)
+ */
+public class ATModeExecutor extends AbstractTransactionExecutor {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(ATModeExecutor.class);
+
+    private MySQLContainer<?> mysqlContainer;
+    private HikariDataSource rawDataSource;
+    private DataSourceProxy dataSourceProxy;
+
+    public ATModeExecutor(BenchmarkConfig config) {
+        super(config);
+    }
+
+    private boolean isRealMode() {
+        return config.getBranches() > 0;
+    }
+
+    @Override
+    public void init() {
+        if (isRealMode()) {
+            LOGGER.info("Initializing AT mode executor (MySQL via 
Testcontainers)");
+            initRealMode();
+        } else {
+            LOGGER.info("AT mode executor initialized (empty transaction 
mode)");
+        }
+    }
+
+    private void initRealMode() {
+        // Start MySQL container
+        startMySQLContainer();
+
+        // Create HikariCP connection pool
+        createDataSource();
+
+        // Initialize database schema and data
+        initDatabase();
+
+        // Wrap with Seata DataSourceProxy for AT mode
+        dataSourceProxy = new DataSourceProxy(rawDataSource);
+
+        LOGGER.info("DataSourceProxy initialized, dbType: {}", 
dataSourceProxy.getDbType());
+        LOGGER.info("Real AT mode executor initialized with {} accounts", 
BenchmarkConstants.ACCOUNT_COUNT);
+    }
+
+    private void startMySQLContainer() {
+        LOGGER.info("Starting MySQL container...");
+        mysqlContainer = new 
MySQLContainer<>(DockerImageName.parse("mysql:8.0"))
+                .withDatabaseName("benchmark")
+                .withUsername("test")
+                .withPassword("test")
+                .withCommand("--character-set-server=utf8mb4", 
"--collation-server=utf8mb4_unicode_ci");
+
+        mysqlContainer.start();
+
+        LOGGER.info("MySQL container started: {}", 
mysqlContainer.getJdbcUrl());
+    }
+
+    private void createDataSource() {
+        HikariConfig hikariConfig = new HikariConfig();
+        hikariConfig.setJdbcUrl(mysqlContainer.getJdbcUrl());
+        hikariConfig.setUsername(mysqlContainer.getUsername());
+        hikariConfig.setPassword(mysqlContainer.getPassword());
+        hikariConfig.setDriverClassName("com.mysql.cj.jdbc.Driver");
+        hikariConfig.setMaximumPoolSize(config.getThreads() * 2);
+        hikariConfig.setMinimumIdle(config.getThreads());
+        hikariConfig.setConnectionTimeout(30000);
+        hikariConfig.setIdleTimeout(600000);
+        hikariConfig.setMaxLifetime(1800000);
+
+        rawDataSource = new HikariDataSource(hikariConfig);
+        LOGGER.info("HikariCP DataSource created");
+    }
+
+    private void initDatabase() {
+        try (Connection conn = rawDataSource.getConnection();
+                Statement stmt = conn.createStatement()) {
+
+            // Create accounts table
+            stmt.execute("CREATE TABLE IF NOT EXISTS accounts ("
+                    + "id BIGINT PRIMARY KEY, "
+                    + "balance INT NOT NULL, "
+                    + "updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON 
UPDATE CURRENT_TIMESTAMP"
+                    + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4");
+
+            // Create undo_log table for Seata AT mode (MySQL syntax)
+            stmt.execute("CREATE TABLE IF NOT EXISTS undo_log ("
+                    + "branch_id BIGINT NOT NULL, "
+                    + "xid VARCHAR(128) NOT NULL, "
+                    + "context VARCHAR(128) NOT NULL, "
+                    + "rollback_info LONGBLOB NOT NULL, "
+                    + "log_status INT NOT NULL, "
+                    + "log_created DATETIME(6) NOT NULL, "
+                    + "log_modified DATETIME(6) NOT NULL, "
+                    + "UNIQUE KEY ux_undo_log (xid, branch_id)"
+                    + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4");
+
+            // Insert test data
+            stmt.execute("TRUNCATE TABLE accounts");
+            try (PreparedStatement pstmt = conn.prepareStatement("INSERT INTO 
accounts (id, balance) VALUES (?, ?)")) {
+                for (int i = 1; i <= BenchmarkConstants.ACCOUNT_COUNT; i++) {
+                    pstmt.setLong(1, i);
+                    pstmt.setInt(2, BenchmarkConstants.INITIAL_BALANCE);
+                    pstmt.addBatch();
+                    if (i % 100 == 0) {
+                        pstmt.executeBatch();
+                    }
+                }
+                pstmt.executeBatch();
+            }
+
+            LOGGER.info(
+                    "Database initialized: {} accounts with balance {}",
+                    BenchmarkConstants.ACCOUNT_COUNT,
+                    BenchmarkConstants.INITIAL_BALANCE);
+
+        } catch (SQLException e) {
+            throw new RuntimeException("Failed to initialize database", e);
+        }
+    }
+
+    @Override
+    protected String getTransactionName() {
+        return isRealMode() ? "benchmark-real-at-tx" : "benchmark-at-tx";
+    }
+
+    @Override
+    protected int getBranchCount() {
+        return config.getBranches();
+    }
+
+    @Override
+    protected void executeBusinessLogic() throws Exception {
+        if (isRealMode()) {
+            executeBranchOperations(config.getBranches());
+        }
+        // Empty mode: do nothing (pure Seata protocol overhead testing)
+    }
+
+    private void executeBranchOperations(int branchCount) throws SQLException {
+        // Execute N branch operations (simulating distributed transaction 
branches)
+        for (int i = 0; i < branchCount; i++) {
+            executeSingleBranch();
+        }
+    }
+
+    @SuppressWarnings("lgtm[java/insecure-randomness]")
+    private void executeSingleBranch() throws SQLException {
+        // Use DataSourceProxy connection to enable AT mode
+        try (Connection conn = dataSourceProxy.getConnection()) {
+            conn.setAutoCommit(false);
+
+            // Transfer between two random accounts
+            long fromAccount = 
(ThreadLocalRandom.current().nextInt(BenchmarkConstants.ACCOUNT_COUNT) + 1);
+            long toAccount = 
(ThreadLocalRandom.current().nextInt(BenchmarkConstants.ACCOUNT_COUNT) + 1);
+            while (toAccount == fromAccount) {
+                toAccount = 
(ThreadLocalRandom.current().nextInt(BenchmarkConstants.ACCOUNT_COUNT) + 1);
+            }
+            int amount = 
ThreadLocalRandom.current().nextInt(BenchmarkConstants.MAX_TRANSFER_AMOUNT)
+                    + BenchmarkConstants.MIN_TRANSFER_AMOUNT;
+
+            // Debit from source account
+            try (PreparedStatement pstmt =
+                    conn.prepareStatement("UPDATE accounts SET balance = 
balance - ? WHERE id = ?")) {
+                pstmt.setInt(1, amount);
+                pstmt.setLong(2, fromAccount);
+                pstmt.executeUpdate();

Review Comment:
   The account balance updates don't check for sufficient balance before 
debiting, which could result in negative balances. While this is a benchmark 
tool and not production code, it may lead to unrealistic test scenarios and 
could mask potential issues with real AT mode transactions. Consider adding a 
balance check or initializing accounts with sufficiently high balances to 
prevent this.
   ```suggestion
               // Debit from source account, ensuring sufficient balance to 
avoid negative values
               try (PreparedStatement pstmt =
                       conn.prepareStatement("UPDATE accounts SET balance = 
balance - ? WHERE id = ? AND balance >= ?")) {
                   pstmt.setInt(1, amount);
                   pstmt.setLong(2, fromAccount);
                   pstmt.setInt(3, amount);
                   int updatedRows = pstmt.executeUpdate();
                   // If no row was updated, there was insufficient balance; 
roll back and skip this transfer
                   if (updatedRows == 0) {
                       conn.rollback();
                       return;
                   }
   ```



##########
test-suite/seata-benchmark-cli/src/main/java/org/apache/seata/benchmark/saga/BenchmarkServiceInvoker.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.benchmark.saga;
+
+import org.apache.seata.saga.engine.invoker.ServiceInvoker;
+import org.apache.seata.saga.statelang.domain.ServiceTaskState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Service invoker for benchmark Saga services.
+ * Invokes registered service methods based on service name and method name.
+ */
+public class BenchmarkServiceInvoker implements ServiceInvoker {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(BenchmarkServiceInvoker.class);
+
+    private final Map<String, Object> serviceRegistry = new HashMap<>();
+
+    public void registerService(String serviceName, Object service) {
+        serviceRegistry.put(serviceName, service);
+        LOGGER.debug("Registered service: {}", serviceName);
+    }
+
+    @Override
+    public Object invoke(ServiceTaskState serviceTaskState, Object... input) 
throws Exception {
+        String serviceName = serviceTaskState.getServiceName();
+        String methodName = serviceTaskState.getServiceMethod();
+
+        Object service = serviceRegistry.get(serviceName);
+        if (service == null) {
+            throw new IllegalArgumentException("Service not found: " + 
serviceName);
+        }
+
+        // Find the method
+        Method method = findMethod(service.getClass(), methodName);
+        if (method == null) {
+            throw new NoSuchMethodException("Method not found: " + methodName 
+ " in service: " + serviceName);
+        }
+
+        // Prepare input parameters
+        Object inputParam = prepareInput(input);
+
+        LOGGER.debug("Invoking service: {}.{}()", serviceName, methodName);
+
+        // Invoke the method
+        return method.invoke(service, inputParam);
+    }
+
+    private Method findMethod(Class<?> clazz, String methodName) {
+        for (Method method : clazz.getMethods()) {
+            if (method.getName().equals(methodName)) {
+                return method;
+            }
+        }
+        return null;
+    }

Review Comment:
   The findMethod implementation returns the first method matching the name 
without checking parameter types or count. If a class has overloaded methods 
with the same name, this will return an arbitrary match which may not be the 
intended method. Consider checking parameter compatibility or documenting that 
overloaded methods are not supported.



##########
test-suite/seata-benchmark-cli/src/main/java/org/apache/seata/benchmark/config/BenchmarkConfigLoader.java:
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.benchmark.config;
+
+import org.apache.seata.common.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.LoaderOptions;
+import org.yaml.snakeyaml.Yaml;
+import org.yaml.snakeyaml.constructor.Constructor;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * Configuration loader for benchmark
+ */
+public class BenchmarkConfigLoader {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(BenchmarkConfigLoader.class);
+    private static final String DEFAULT_CONFIG_FILE = "benchmark.yaml";
+    private static final String ENV_CONFIG_FILE = "BENCHMARK_CONFIG_FILE";
+    private static final String SYS_CONFIG_FILE = "benchmark.config.file";
+
+    public static BenchmarkConfig load(String configFile) throws IOException {
+        BenchmarkConfig config;
+        String configSource;
+
+        if (configFile != null && !configFile.isEmpty()) {
+            config = loadFromFile(configFile);
+            configSource = "CLI argument: " + configFile;
+        } else {
+            String envConfig = System.getenv(ENV_CONFIG_FILE);
+            if (envConfig != null && !envConfig.isEmpty()) {
+                config = loadFromFile(envConfig);
+                configSource = "Environment variable " + ENV_CONFIG_FILE + ": 
" + envConfig;
+            } else {
+                String sysConfig = System.getProperty(SYS_CONFIG_FILE);
+                if (sysConfig != null && !sysConfig.isEmpty()) {
+                    config = loadFromFile(sysConfig);
+                    configSource = "System property " + SYS_CONFIG_FILE + ": " 
+ sysConfig;
+                } else {
+                    config = loadDefault();
+                    configSource = "Default classpath: " + DEFAULT_CONFIG_FILE;
+                }
+            }
+        }
+
+        LOGGER.info("Loading configuration from: {}", configSource);
+        config.validate();
+        return config;
+    }
+
+    private static BenchmarkConfig loadFromFile(String filePath) throws 
IOException {
+        try (FileInputStream fis = new FileInputStream(filePath)) {
+            return loadFromStream(fis);
+        }
+    }
+
+    public static BenchmarkConfig loadDefault() throws IOException {
+        try (InputStream is = 
BenchmarkConfigLoader.class.getClassLoader().getResourceAsStream(DEFAULT_CONFIG_FILE))
 {
+            if (is == null) {
+                LOGGER.warn("Default configuration file not found, using 
built-in defaults");
+                return new BenchmarkConfig();
+            }
+            return loadFromStream(is);
+        }
+    }
+
+    private static BenchmarkConfig loadFromStream(InputStream inputStream) {
+        LoaderOptions loaderOptions = new LoaderOptions();
+        Constructor constructor = new Constructor(BenchmarkConfig.class, 
loaderOptions);
+        Yaml yaml = new Yaml(constructor);
+        return yaml.load(inputStream);

Review Comment:
   The SnakeYAML library should be configured with SafeConstructor instead of 
Constructor to prevent arbitrary code execution via YAML deserialization. This 
is a known security vulnerability (CVE-2022-1471 and related). Use 
SafeConstructor or configure LoaderOptions to restrict allowed types.



##########
test-suite/seata-benchmark-cli/src/main/java/org/apache/seata/benchmark/executor/SagaModeExecutor.java:
##########
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.benchmark.executor;
+
+import org.apache.seata.benchmark.config.BenchmarkConfig;
+import org.apache.seata.benchmark.model.TransactionRecord;
+import org.apache.seata.benchmark.saga.BenchmarkServiceInvoker;
+import org.apache.seata.benchmark.saga.InventorySagaService;
+import org.apache.seata.benchmark.saga.OrderSagaService;
+import org.apache.seata.benchmark.saga.PaymentSagaService;
+import org.apache.seata.benchmark.saga.SimpleSpelExpressionFactory;
+import org.apache.seata.core.exception.TransactionException;
+import org.apache.seata.core.model.GlobalStatus;
+import org.apache.seata.saga.engine.StateMachineEngine;
+import org.apache.seata.saga.engine.config.AbstractStateMachineConfig;
+import org.apache.seata.saga.engine.expression.ExpressionFactoryManager;
+import org.apache.seata.saga.engine.impl.ProcessCtrlStateMachineEngine;
+import org.apache.seata.saga.statelang.domain.DomainConstants;
+import org.apache.seata.saga.statelang.domain.ExecutionStatus;
+import org.apache.seata.saga.statelang.domain.StateMachineInstance;
+import org.apache.seata.tm.api.GlobalTransaction;
+import org.apache.seata.tm.api.GlobalTransactionContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.InputStream;
+import java.math.BigDecimal;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static 
org.apache.seata.benchmark.constant.BenchmarkConstants.STATUS_COMMITTED;
+import static 
org.apache.seata.benchmark.constant.BenchmarkConstants.STATUS_COMPENSATED;
+import static 
org.apache.seata.benchmark.constant.BenchmarkConstants.STATUS_COMPENSATION_FAILED;
+import static 
org.apache.seata.benchmark.constant.BenchmarkConstants.STATUS_FAILED;
+import static 
org.apache.seata.benchmark.constant.BenchmarkConstants.STATUS_UNKNOWN;
+
+/**
+ * Saga mode transaction executor supporting both mock and real modes
+ * - branches == 0: Mock mode (simplified Saga simulation without state 
machine)
+ * - branches > 0: Real mode (state machine engine with compensation support)
+ */
+public class SagaModeExecutor implements TransactionExecutor {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(SagaModeExecutor.class);
+    private static final AtomicLong BUSINESS_KEY_COUNTER = new AtomicLong(0);
+
+    private static final String SIMPLE_SAGA_NAME = "benchmarkSimpleSaga";
+    private static final String ORDER_SAGA_NAME = "benchmarkOrderSaga";
+
+    private final BenchmarkConfig config;
+    private StateMachineEngine stateMachineEngine;
+    private BenchmarkStateMachineConfig stateMachineConfig;
+
+    public SagaModeExecutor(BenchmarkConfig config) {
+        this.config = config;
+    }
+
+    private boolean isRealMode() {
+        return config.getBranches() > 0;
+    }
+
+    @Override
+    public void init() {
+        if (isRealMode()) {
+            initRealMode();
+        } else {
+            LOGGER.info("Saga mode executor initialized (simplified mock 
mode)");
+            LOGGER.info("Note: Full Saga annotation support requires Spring 
framework integration");
+        }
+    }
+
+    private void initRealMode() {
+        LOGGER.info("Initializing Real Saga mode executor with state machine 
engine");
+
+        try {
+            // Create and configure state machine config
+            stateMachineConfig = new BenchmarkStateMachineConfig();
+            
stateMachineConfig.setRollbackPercentage(config.getRollbackPercentage());
+            stateMachineConfig.init();
+
+            // Create state machine engine
+            ProcessCtrlStateMachineEngine engine = new 
ProcessCtrlStateMachineEngine();
+            engine.setStateMachineConfig(stateMachineConfig);
+            this.stateMachineEngine = engine;
+
+            LOGGER.info("Real Saga mode executor initialized");
+            LOGGER.info("Available state machines: {}, {}", SIMPLE_SAGA_NAME, 
ORDER_SAGA_NAME);
+
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to initialize Saga state 
machine engine", e);
+        }
+    }
+
+    @Override
+    public TransactionRecord execute() {
+        if (isRealMode()) {
+            return executeRealMode();
+        } else {
+            return executeMockMode();
+        }
+    }
+
+    private TransactionRecord executeMockMode() {
+        GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();
+        long startTime = System.currentTimeMillis();
+        String xid = null;
+        String status = STATUS_UNKNOWN;
+        int branchCount = config.getBranches();
+        boolean success = false;
+
+        try {
+            tx.begin(30000, "benchmark-saga-tx");
+            xid = tx.getXid();
+
+            // Simulate Saga actions (forward phase)
+            for (int i = 0; i < branchCount; i++) {
+                simulateSagaAction(i);
+            }
+
+            if (shouldRollback()) {
+                // Simulate compensation (backward phase)
+                for (int i = branchCount - 1; i >= 0; i--) {
+                    simulateCompensation(i);
+                }
+                tx.rollback();
+                status = STATUS_COMPENSATED;
+            } else {
+                tx.commit();
+                status = STATUS_COMMITTED;
+                success = true;
+            }
+
+        } catch (TransactionException e) {
+            LOGGER.debug("Transaction failed: {}", e.getMessage());
+            status = STATUS_FAILED;
+            try {
+                if (tx.getStatus() != GlobalStatus.Rollbacked && 
tx.getStatus() != GlobalStatus.RollbackFailed) {
+                    tx.rollback();
+                }
+            } catch (TransactionException rollbackEx) {
+                LOGGER.debug("Rollback failed: {}", rollbackEx.getMessage());
+            }
+        }
+
+        long duration = System.currentTimeMillis() - startTime;
+        return new TransactionRecord(xid, status, duration, branchCount, 
success);
+    }
+
+    private TransactionRecord executeRealMode() {
+        long startTime = System.currentTimeMillis();
+        String businessKey = "benchmark-" + 
BUSINESS_KEY_COUNTER.incrementAndGet();
+        String status = STATUS_UNKNOWN;
+        int branchCount = config.getBranches();
+        boolean success = false;
+
+        try {
+            // Choose state machine based on branch count
+            String stateMachineName = branchCount >= 3 ? ORDER_SAGA_NAME : 
SIMPLE_SAGA_NAME;
+
+            // Prepare start parameters
+            Map<String, Object> startParams = createStartParams();
+
+            // Execute state machine
+            StateMachineInstance instance = 
stateMachineEngine.startWithBusinessKey(
+                    stateMachineName, stateMachineConfig.getDefaultTenantId(), 
businessKey, startParams);
+
+            // Check execution result
+            ExecutionStatus executionStatus = instance.getStatus();
+            ExecutionStatus compensationStatus = 
instance.getCompensationStatus();
+
+            if (ExecutionStatus.SU.equals(executionStatus)) {
+                status = STATUS_COMMITTED;
+                success = true;
+            } else if (ExecutionStatus.FA.equals(executionStatus)) {
+                if (compensationStatus != null) {
+                    if (ExecutionStatus.SU.equals(compensationStatus)) {
+                        status = STATUS_COMPENSATED;
+                    } else {
+                        status = STATUS_COMPENSATION_FAILED;
+                    }
+                } else {
+                    status = STATUS_FAILED;
+                }
+            } else if (ExecutionStatus.UN.equals(executionStatus)) {
+                status = STATUS_UNKNOWN;
+            } else {
+                status = executionStatus != null ? executionStatus.name() : 
STATUS_UNKNOWN;
+            }
+
+        } catch (Exception e) {
+            LOGGER.debug("Saga execution failed: {}", e.getMessage());
+            status = STATUS_FAILED;
+        }
+
+        long duration = System.currentTimeMillis() - startTime;
+        return new TransactionRecord(businessKey, status, duration, 
branchCount, success);
+    }
+
+    @SuppressWarnings("lgtm[java/insecure-randomness]")
+    private Map<String, Object> createStartParams() {
+        Map<String, Object> params = new HashMap<>();
+        params.put("userId", "user-" + 
ThreadLocalRandom.current().nextInt(1000));
+        params.put("productId", "product-" + 
ThreadLocalRandom.current().nextInt(100));
+        params.put("quantity", ThreadLocalRandom.current().nextInt(10) + 1);
+        params.put("amount", new 
BigDecimal(ThreadLocalRandom.current().nextInt(1000) + 100));
+        params.put("accountId", "account-" + 
ThreadLocalRandom.current().nextInt(1000));
+        return params;
+    }
+
+    private void simulateSagaAction(int branchId) {
+        // Simulated Saga forward action
+        // In real implementation, this would be a @CompensationBusinessAction 
annotated method
+        LOGGER.trace("Executing Saga action for branch {}", branchId);
+    }
+
+    private void simulateCompensation(int branchId) {
+        // Simulated Saga compensation action
+        // In real implementation, this would be the compensationMethod
+        LOGGER.trace("Executing compensation for branch {}", branchId);
+    }
+
+    @SuppressWarnings("lgtm[java/insecure-randomness]")
+    private boolean shouldRollback() {
+        return ThreadLocalRandom.current().nextInt(100) < 
config.getRollbackPercentage();
+    }
+
+    @Override
+    public void destroy() {
+        if (isRealMode()) {
+            destroyRealMode();
+        }
+        LOGGER.info("Saga mode executor destroyed");
+    }
+
+    private void destroyRealMode() {
+        LOGGER.info("Destroying Real Saga mode resources");
+        // StateMachineEngine doesn't have a close method
+        stateMachineEngine = null;
+        stateMachineConfig = null;
+    }
+
+    /**
+     * Custom StateMachineConfig for benchmark testing.
+     */
+    private static class BenchmarkStateMachineConfig extends 
AbstractStateMachineConfig {
+
+        private int rollbackPercentage = 0;
+
+        public void setRollbackPercentage(int rollbackPercentage) {
+            this.rollbackPercentage = rollbackPercentage;
+        }
+
+        @Override
+        public void init() throws Exception {
+            // Load state machine definitions from classpath
+            InputStream simpleSagaStream =
+                    
getClass().getClassLoader().getResourceAsStream("seata/saga/statelang/benchmark_simple_saga.json");
+            InputStream orderSagaStream =
+                    
getClass().getClassLoader().getResourceAsStream("seata/saga/statelang/benchmark_order_saga.json");
+
+            if (simpleSagaStream == null || orderSagaStream == null) {
+                throw new RuntimeException("Failed to load Saga state machine 
definitions from classpath");
+            }
+
+            setStateMachineDefInputStreamArray(new InputStream[] 
{simpleSagaStream, orderSagaStream});
+
+            // Initialize parent config
+            super.init();
+
+            // Register SpEL expression factory for parameter evaluation
+            ExpressionFactoryManager expressionFactoryManager = 
getExpressionFactoryManager();
+            SimpleSpelExpressionFactory spelExpressionFactory = new 
SimpleSpelExpressionFactory();
+            // Register for default type (when expression doesn't start with $)
+            expressionFactoryManager.putExpressionFactory(
+                    ExpressionFactoryManager.DEFAULT_EXPRESSION_TYPE, 
spelExpressionFactory);
+            // Register for empty type (when expression is like $.xxx where 
type is empty string)
+            expressionFactoryManager.putExpressionFactory("", 
spelExpressionFactory);
+
+            // Register benchmark services with the service invoker manager
+            BenchmarkServiceInvoker serviceInvoker = new 
BenchmarkServiceInvoker();
+
+            // Register services with configured rollback percentage
+            // Divide rollback percentage by 3 for each service so total 
probability is approximately correct
+            int serviceRollbackPct = Math.max(1, rollbackPercentage / 3);
+
+            serviceInvoker.registerService("orderService", new 
OrderSagaService(serviceRollbackPct, 5));
+            serviceInvoker.registerService("inventoryService", new 
InventorySagaService(serviceRollbackPct, 5));
+            serviceInvoker.registerService("paymentService", new 
PaymentSagaService(serviceRollbackPct, 5));

Review Comment:
   The rollback percentage distribution logic divides by 3 and uses Math.max(1, 
...), which means even when rollbackPercentage is 0, each service will have a 
1% failure rate. This changes the expected behavior where 0 should mean no 
failures. Consider using Math.max(0, rollbackPercentage / 3) or skipping the 
division entirely if rollbackPercentage is 0.



##########
test-suite/seata-benchmark-cli/src/main/java/org/apache/seata/benchmark/executor/SagaModeExecutor.java:
##########
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.benchmark.executor;
+
+import org.apache.seata.benchmark.config.BenchmarkConfig;
+import org.apache.seata.benchmark.model.TransactionRecord;
+import org.apache.seata.benchmark.saga.BenchmarkServiceInvoker;
+import org.apache.seata.benchmark.saga.InventorySagaService;
+import org.apache.seata.benchmark.saga.OrderSagaService;
+import org.apache.seata.benchmark.saga.PaymentSagaService;
+import org.apache.seata.benchmark.saga.SimpleSpelExpressionFactory;
+import org.apache.seata.core.exception.TransactionException;
+import org.apache.seata.core.model.GlobalStatus;
+import org.apache.seata.saga.engine.StateMachineEngine;
+import org.apache.seata.saga.engine.config.AbstractStateMachineConfig;
+import org.apache.seata.saga.engine.expression.ExpressionFactoryManager;
+import org.apache.seata.saga.engine.impl.ProcessCtrlStateMachineEngine;
+import org.apache.seata.saga.statelang.domain.DomainConstants;
+import org.apache.seata.saga.statelang.domain.ExecutionStatus;
+import org.apache.seata.saga.statelang.domain.StateMachineInstance;
+import org.apache.seata.tm.api.GlobalTransaction;
+import org.apache.seata.tm.api.GlobalTransactionContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.InputStream;
+import java.math.BigDecimal;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static 
org.apache.seata.benchmark.constant.BenchmarkConstants.STATUS_COMMITTED;
+import static 
org.apache.seata.benchmark.constant.BenchmarkConstants.STATUS_COMPENSATED;
+import static 
org.apache.seata.benchmark.constant.BenchmarkConstants.STATUS_COMPENSATION_FAILED;
+import static 
org.apache.seata.benchmark.constant.BenchmarkConstants.STATUS_FAILED;
+import static 
org.apache.seata.benchmark.constant.BenchmarkConstants.STATUS_UNKNOWN;
+
+/**
+ * Saga mode transaction executor supporting both mock and real modes
+ * - branches == 0: Mock mode (simplified Saga simulation without state 
machine)
+ * - branches > 0: Real mode (state machine engine with compensation support)
+ */
+public class SagaModeExecutor implements TransactionExecutor {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(SagaModeExecutor.class);
+    private static final AtomicLong BUSINESS_KEY_COUNTER = new AtomicLong(0);
+
+    private static final String SIMPLE_SAGA_NAME = "benchmarkSimpleSaga";
+    private static final String ORDER_SAGA_NAME = "benchmarkOrderSaga";
+
+    private final BenchmarkConfig config;
+    private StateMachineEngine stateMachineEngine;
+    private BenchmarkStateMachineConfig stateMachineConfig;
+
+    public SagaModeExecutor(BenchmarkConfig config) {
+        this.config = config;
+    }
+
+    private boolean isRealMode() {
+        return config.getBranches() > 0;
+    }
+
+    @Override
+    public void init() {
+        if (isRealMode()) {
+            initRealMode();
+        } else {
+            LOGGER.info("Saga mode executor initialized (simplified mock 
mode)");
+            LOGGER.info("Note: Full Saga annotation support requires Spring 
framework integration");
+        }
+    }
+
+    private void initRealMode() {
+        LOGGER.info("Initializing Real Saga mode executor with state machine 
engine");
+
+        try {
+            // Create and configure state machine config
+            stateMachineConfig = new BenchmarkStateMachineConfig();
+            
stateMachineConfig.setRollbackPercentage(config.getRollbackPercentage());
+            stateMachineConfig.init();
+
+            // Create state machine engine
+            ProcessCtrlStateMachineEngine engine = new 
ProcessCtrlStateMachineEngine();
+            engine.setStateMachineConfig(stateMachineConfig);
+            this.stateMachineEngine = engine;
+
+            LOGGER.info("Real Saga mode executor initialized");
+            LOGGER.info("Available state machines: {}, {}", SIMPLE_SAGA_NAME, 
ORDER_SAGA_NAME);
+
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to initialize Saga state 
machine engine", e);
+        }
+    }
+
+    @Override
+    public TransactionRecord execute() {
+        if (isRealMode()) {
+            return executeRealMode();
+        } else {
+            return executeMockMode();
+        }
+    }
+
+    private TransactionRecord executeMockMode() {
+        GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();
+        long startTime = System.currentTimeMillis();
+        String xid = null;
+        String status = STATUS_UNKNOWN;
+        int branchCount = config.getBranches();
+        boolean success = false;
+
+        try {
+            tx.begin(30000, "benchmark-saga-tx");
+            xid = tx.getXid();
+
+            // Simulate Saga actions (forward phase)
+            for (int i = 0; i < branchCount; i++) {
+                simulateSagaAction(i);
+            }
+
+            if (shouldRollback()) {
+                // Simulate compensation (backward phase)
+                for (int i = branchCount - 1; i >= 0; i--) {
+                    simulateCompensation(i);
+                }
+                tx.rollback();
+                status = STATUS_COMPENSATED;
+            } else {
+                tx.commit();
+                status = STATUS_COMMITTED;
+                success = true;
+            }
+
+        } catch (TransactionException e) {
+            LOGGER.debug("Transaction failed: {}", e.getMessage());
+            status = STATUS_FAILED;
+            try {
+                if (tx.getStatus() != GlobalStatus.Rollbacked && 
tx.getStatus() != GlobalStatus.RollbackFailed) {
+                    tx.rollback();
+                }
+            } catch (TransactionException rollbackEx) {
+                LOGGER.debug("Rollback failed: {}", rollbackEx.getMessage());
+            }
+        }
+
+        long duration = System.currentTimeMillis() - startTime;
+        return new TransactionRecord(xid, status, duration, branchCount, 
success);
+    }
+
+    private TransactionRecord executeRealMode() {
+        long startTime = System.currentTimeMillis();
+        String businessKey = "benchmark-" + 
BUSINESS_KEY_COUNTER.incrementAndGet();
+        String status = STATUS_UNKNOWN;
+        int branchCount = config.getBranches();
+        boolean success = false;
+
+        try {
+            // Choose state machine based on branch count
+            String stateMachineName = branchCount >= 3 ? ORDER_SAGA_NAME : 
SIMPLE_SAGA_NAME;
+
+            // Prepare start parameters
+            Map<String, Object> startParams = createStartParams();
+
+            // Execute state machine
+            StateMachineInstance instance = 
stateMachineEngine.startWithBusinessKey(
+                    stateMachineName, stateMachineConfig.getDefaultTenantId(), 
businessKey, startParams);
+
+            // Check execution result
+            ExecutionStatus executionStatus = instance.getStatus();
+            ExecutionStatus compensationStatus = 
instance.getCompensationStatus();
+
+            if (ExecutionStatus.SU.equals(executionStatus)) {
+                status = STATUS_COMMITTED;
+                success = true;
+            } else if (ExecutionStatus.FA.equals(executionStatus)) {
+                if (compensationStatus != null) {
+                    if (ExecutionStatus.SU.equals(compensationStatus)) {
+                        status = STATUS_COMPENSATED;
+                    } else {
+                        status = STATUS_COMPENSATION_FAILED;
+                    }
+                } else {
+                    status = STATUS_FAILED;
+                }
+            } else if (ExecutionStatus.UN.equals(executionStatus)) {
+                status = STATUS_UNKNOWN;
+            } else {
+                status = executionStatus != null ? executionStatus.name() : 
STATUS_UNKNOWN;
+            }
+
+        } catch (Exception e) {
+            LOGGER.debug("Saga execution failed: {}", e.getMessage());
+            status = STATUS_FAILED;
+        }
+
+        long duration = System.currentTimeMillis() - startTime;
+        return new TransactionRecord(businessKey, status, duration, 
branchCount, success);
+    }
+
+    @SuppressWarnings("lgtm[java/insecure-randomness]")
+    private Map<String, Object> createStartParams() {
+        Map<String, Object> params = new HashMap<>();
+        params.put("userId", "user-" + 
ThreadLocalRandom.current().nextInt(1000));
+        params.put("productId", "product-" + 
ThreadLocalRandom.current().nextInt(100));
+        params.put("quantity", ThreadLocalRandom.current().nextInt(10) + 1);
+        params.put("amount", new 
BigDecimal(ThreadLocalRandom.current().nextInt(1000) + 100));
+        params.put("accountId", "account-" + 
ThreadLocalRandom.current().nextInt(1000));
+        return params;
+    }
+
+    private void simulateSagaAction(int branchId) {
+        // Simulated Saga forward action
+        // In real implementation, this would be a @CompensationBusinessAction 
annotated method
+        LOGGER.trace("Executing Saga action for branch {}", branchId);
+    }
+
+    private void simulateCompensation(int branchId) {
+        // Simulated Saga compensation action
+        // In real implementation, this would be the compensationMethod
+        LOGGER.trace("Executing compensation for branch {}", branchId);
+    }
+
+    @SuppressWarnings("lgtm[java/insecure-randomness]")
+    private boolean shouldRollback() {
+        return ThreadLocalRandom.current().nextInt(100) < 
config.getRollbackPercentage();
+    }
+
+    @Override
+    public void destroy() {
+        if (isRealMode()) {
+            destroyRealMode();
+        }
+        LOGGER.info("Saga mode executor destroyed");
+    }
+
+    private void destroyRealMode() {
+        LOGGER.info("Destroying Real Saga mode resources");
+        // StateMachineEngine doesn't have a close method
+        stateMachineEngine = null;
+        stateMachineConfig = null;
+    }
+
+    /**
+     * Custom StateMachineConfig for benchmark testing.
+     */
+    private static class BenchmarkStateMachineConfig extends 
AbstractStateMachineConfig {
+
+        private int rollbackPercentage = 0;
+
+        public void setRollbackPercentage(int rollbackPercentage) {
+            this.rollbackPercentage = rollbackPercentage;
+        }
+
+        @Override
+        public void init() throws Exception {
+            // Load state machine definitions from classpath
+            InputStream simpleSagaStream =
+                    
getClass().getClassLoader().getResourceAsStream("seata/saga/statelang/benchmark_simple_saga.json");
+            InputStream orderSagaStream =
+                    
getClass().getClassLoader().getResourceAsStream("seata/saga/statelang/benchmark_order_saga.json");
+
+            if (simpleSagaStream == null || orderSagaStream == null) {
+                throw new RuntimeException("Failed to load Saga state machine 
definitions from classpath");
+            }
+
+            setStateMachineDefInputStreamArray(new InputStream[] 
{simpleSagaStream, orderSagaStream});
+
+            // Initialize parent config
+            super.init();
+
+            // Register SpEL expression factory for parameter evaluation
+            ExpressionFactoryManager expressionFactoryManager = 
getExpressionFactoryManager();
+            SimpleSpelExpressionFactory spelExpressionFactory = new 
SimpleSpelExpressionFactory();
+            // Register for default type (when expression doesn't start with $)
+            expressionFactoryManager.putExpressionFactory(
+                    ExpressionFactoryManager.DEFAULT_EXPRESSION_TYPE, 
spelExpressionFactory);
+            // Register for empty type (when expression is like $.xxx where 
type is empty string)
+            expressionFactoryManager.putExpressionFactory("", 
spelExpressionFactory);
+
+            // Register benchmark services with the service invoker manager
+            BenchmarkServiceInvoker serviceInvoker = new 
BenchmarkServiceInvoker();
+
+            // Register services with configured rollback percentage
+            // Divide rollback percentage by 3 for each service so total 
probability is approximately correct
+            int serviceRollbackPct = Math.max(1, rollbackPercentage / 3);
+
+            serviceInvoker.registerService("orderService", new 
OrderSagaService(serviceRollbackPct, 5));
+            serviceInvoker.registerService("inventoryService", new 
InventorySagaService(serviceRollbackPct, 5));
+            serviceInvoker.registerService("paymentService", new 
PaymentSagaService(serviceRollbackPct, 5));
+
+            // Register the service invoker for different service types
+            
getServiceInvokerManager().putServiceInvoker(DomainConstants.SERVICE_TYPE_SPRING_BEAN,
 serviceInvoker);

Review Comment:
   The InputStreams loaded from the classpath are never explicitly closed. 
While they are passed to setStateMachineDefInputStreamArray and may be closed 
internally, it's not guaranteed. Consider wrapping the resource loading in a 
try-with-resources block or documenting that the parent class handles stream 
closure.
   ```suggestion
               try (InputStream simpleSagaStream =
                           
getClass().getClassLoader().getResourceAsStream("seata/saga/statelang/benchmark_simple_saga.json");
                    InputStream orderSagaStream =
                           
getClass().getClassLoader().getResourceAsStream("seata/saga/statelang/benchmark_order_saga.json"))
 {
   
                   if (simpleSagaStream == null || orderSagaStream == null) {
                       throw new RuntimeException("Failed to load Saga state 
machine definitions from classpath");
                   }
   
                   setStateMachineDefInputStreamArray(new InputStream[] 
{simpleSagaStream, orderSagaStream});
   
                   // Initialize parent config
                   super.init();
   
                   // Register SpEL expression factory for parameter evaluation
                   ExpressionFactoryManager expressionFactoryManager = 
getExpressionFactoryManager();
                   SimpleSpelExpressionFactory spelExpressionFactory = new 
SimpleSpelExpressionFactory();
                   // Register for default type (when expression doesn't start 
with $)
                   expressionFactoryManager.putExpressionFactory(
                           ExpressionFactoryManager.DEFAULT_EXPRESSION_TYPE, 
spelExpressionFactory);
                   // Register for empty type (when expression is like $.xxx 
where type is empty string)
                   expressionFactoryManager.putExpressionFactory("", 
spelExpressionFactory);
   
                   // Register benchmark services with the service invoker 
manager
                   BenchmarkServiceInvoker serviceInvoker = new 
BenchmarkServiceInvoker();
   
                   // Register services with configured rollback percentage
                   // Divide rollback percentage by 3 for each service so total 
probability is approximately correct
                   int serviceRollbackPct = Math.max(1, rollbackPercentage / 3);
   
                   serviceInvoker.registerService("orderService", new 
OrderSagaService(serviceRollbackPct, 5));
                   serviceInvoker.registerService("inventoryService", new 
InventorySagaService(serviceRollbackPct, 5));
                   serviceInvoker.registerService("paymentService", new 
PaymentSagaService(serviceRollbackPct, 5));
   
                   // Register the service invoker for different service types
                   
getServiceInvokerManager().putServiceInvoker(DomainConstants.SERVICE_TYPE_SPRING_BEAN,
 serviceInvoker);
               }
   ```



##########
test-suite/seata-benchmark-cli/src/main/java/org/apache/seata/benchmark/model/BenchmarkMetrics.java:
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.benchmark.model;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Benchmark performance metrics
+ */
+public class BenchmarkMetrics {
+
+    private final AtomicLong totalCount = new AtomicLong(0);
+    private final AtomicLong successCount = new AtomicLong(0);
+    private final AtomicLong failedCount = new AtomicLong(0);
+    private final List<Long> latencies = Collections.synchronizedList(new 
ArrayList<>());
+    private final long startTime = System.currentTimeMillis();
+
+    private volatile long lastCountSnapshot = 0;
+    private volatile long lastSnapshotTime = System.currentTimeMillis();
+
+    public void recordSuccess(long latencyMs) {
+        totalCount.incrementAndGet();
+        successCount.incrementAndGet();
+        latencies.add(latencyMs);
+    }
+
+    public void recordFailure(long latencyMs) {
+        totalCount.incrementAndGet();
+        failedCount.incrementAndGet();
+        latencies.add(latencyMs);
+    }
+
+    public long getTotalCount() {
+        return totalCount.get();
+    }
+
+    public long getSuccessCount() {
+        return successCount.get();
+    }
+
+    public long getFailedCount() {
+        return failedCount.get();
+    }
+
+    public double getSuccessRate() {
+        long total = totalCount.get();
+        if (total == 0) {
+            return 0.0;
+        }
+        return (double) successCount.get() / total * 100;
+    }
+
+    public double getAverageTps() {
+        long elapsed = System.currentTimeMillis() - startTime;
+        if (elapsed == 0) {
+            return 0.0;
+        }
+        return (double) totalCount.get() / elapsed * 1000;
+    }
+
+    public synchronized double getCurrentTps() {
+        long currentTime = System.currentTimeMillis();
+        long currentCount = totalCount.get();
+        long elapsed = currentTime - lastSnapshotTime;
+
+        if (elapsed == 0) {
+            return 0.0;
+        }
+
+        double tps = (double) (currentCount - lastCountSnapshot) / elapsed * 
1000;
+        lastSnapshotTime = currentTime;
+        lastCountSnapshot = currentCount;
+        return tps;
+    }
+
+    public long getElapsedTimeSeconds() {
+        return (System.currentTimeMillis() - startTime) / 1000;
+    }
+
+    public LatencyStats getLatencyStats() {
+        if (latencies.isEmpty()) {
+            return new LatencyStats(0, 0, 0, 0);
+        }
+
+        List<Long> sortedLatencies = new ArrayList<>(latencies);
+        Collections.sort(sortedLatencies);
+
+        int size = sortedLatencies.size();
+        long p50 = sortedLatencies.get(Math.max(0, (int) Math.ceil(size * 0.5) 
- 1));
+        long p95 = sortedLatencies.get(Math.max(0, (int) Math.ceil(size * 
0.95) - 1));
+        long p99 = sortedLatencies.get(Math.max(0, (int) Math.ceil(size * 
0.99) - 1));
+        long max = sortedLatencies.get(size - 1);
+
+        return new LatencyStats(p50, p95, p99, max);
+    }

Review Comment:
   The getLatencyStats method creates a full copy of the latencies list and 
sorts it on every call. This is inefficient and can cause performance issues 
and memory pressure, especially with large datasets. Consider using a streaming 
percentile algorithm (like t-digest) or caching the sorted results and 
invalidating on updates.



##########
test-suite/seata-benchmark-cli/src/main/java/org/apache/seata/benchmark/model/BenchmarkMetrics.java:
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.benchmark.model;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Benchmark performance metrics
+ */
+public class BenchmarkMetrics {
+
+    private final AtomicLong totalCount = new AtomicLong(0);
+    private final AtomicLong successCount = new AtomicLong(0);
+    private final AtomicLong failedCount = new AtomicLong(0);
+    private final List<Long> latencies = Collections.synchronizedList(new 
ArrayList<>());
+    private final long startTime = System.currentTimeMillis();
+
+    private volatile long lastCountSnapshot = 0;
+    private volatile long lastSnapshotTime = System.currentTimeMillis();
+
+    public void recordSuccess(long latencyMs) {
+        totalCount.incrementAndGet();
+        successCount.incrementAndGet();
+        latencies.add(latencyMs);
+    }
+
+    public void recordFailure(long latencyMs) {
+        totalCount.incrementAndGet();
+        failedCount.incrementAndGet();
+        latencies.add(latencyMs);

Review Comment:
   The latencies list has unbounded growth which can cause OutOfMemoryError on 
long-running or high-volume benchmarks. The PR description mentions "latency 
sampling" with a max of 500K samples to prevent OOM, but this implementation 
doesn't enforce any limit. Consider implementing a sampling strategy or bounded 
collection as described in the design notes.



##########
test-suite/seata-benchmark-cli/src/main/java/org/apache/seata/benchmark/executor/ATModeExecutor.java:
##########
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.benchmark.executor;
+
+import com.zaxxer.hikari.HikariConfig;
+import com.zaxxer.hikari.HikariDataSource;
+import org.apache.seata.benchmark.config.BenchmarkConfig;
+import org.apache.seata.benchmark.constant.BenchmarkConstants;
+import org.apache.seata.rm.datasource.DataSourceProxy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.MySQLContainer;
+import org.testcontainers.utility.DockerImageName;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * AT mode transaction executor supporting both empty and real transaction 
modes
+ * - branches == 0: Empty transaction mode (pure Seata protocol overhead 
testing)
+ * - branches > 0: Real mode with MySQL database (via Testcontainers)
+ */
+public class ATModeExecutor extends AbstractTransactionExecutor {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(ATModeExecutor.class);
+
+    private MySQLContainer<?> mysqlContainer;
+    private HikariDataSource rawDataSource;
+    private DataSourceProxy dataSourceProxy;
+
+    public ATModeExecutor(BenchmarkConfig config) {
+        super(config);
+    }
+
+    private boolean isRealMode() {
+        return config.getBranches() > 0;
+    }
+
+    @Override
+    public void init() {
+        if (isRealMode()) {
+            LOGGER.info("Initializing AT mode executor (MySQL via 
Testcontainers)");
+            initRealMode();
+        } else {
+            LOGGER.info("AT mode executor initialized (empty transaction 
mode)");
+        }
+    }
+
+    private void initRealMode() {
+        // Start MySQL container
+        startMySQLContainer();
+
+        // Create HikariCP connection pool
+        createDataSource();
+
+        // Initialize database schema and data
+        initDatabase();
+
+        // Wrap with Seata DataSourceProxy for AT mode
+        dataSourceProxy = new DataSourceProxy(rawDataSource);
+
+        LOGGER.info("DataSourceProxy initialized, dbType: {}", 
dataSourceProxy.getDbType());
+        LOGGER.info("Real AT mode executor initialized with {} accounts", 
BenchmarkConstants.ACCOUNT_COUNT);
+    }
+
+    private void startMySQLContainer() {
+        LOGGER.info("Starting MySQL container...");
+        mysqlContainer = new 
MySQLContainer<>(DockerImageName.parse("mysql:8.0"))
+                .withDatabaseName("benchmark")
+                .withUsername("test")
+                .withPassword("test")
+                .withCommand("--character-set-server=utf8mb4", 
"--collation-server=utf8mb4_unicode_ci");
+
+        mysqlContainer.start();
+
+        LOGGER.info("MySQL container started: {}", 
mysqlContainer.getJdbcUrl());
+    }
+
+    private void createDataSource() {
+        HikariConfig hikariConfig = new HikariConfig();
+        hikariConfig.setJdbcUrl(mysqlContainer.getJdbcUrl());
+        hikariConfig.setUsername(mysqlContainer.getUsername());
+        hikariConfig.setPassword(mysqlContainer.getPassword());
+        hikariConfig.setDriverClassName("com.mysql.cj.jdbc.Driver");
+        hikariConfig.setMaximumPoolSize(config.getThreads() * 2);
+        hikariConfig.setMinimumIdle(config.getThreads());
+        hikariConfig.setConnectionTimeout(30000);
+        hikariConfig.setIdleTimeout(600000);
+        hikariConfig.setMaxLifetime(1800000);
+
+        rawDataSource = new HikariDataSource(hikariConfig);
+        LOGGER.info("HikariCP DataSource created");
+    }
+
+    private void initDatabase() {
+        try (Connection conn = rawDataSource.getConnection();
+                Statement stmt = conn.createStatement()) {
+
+            // Create accounts table
+            stmt.execute("CREATE TABLE IF NOT EXISTS accounts ("
+                    + "id BIGINT PRIMARY KEY, "
+                    + "balance INT NOT NULL, "
+                    + "updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON 
UPDATE CURRENT_TIMESTAMP"
+                    + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4");
+
+            // Create undo_log table for Seata AT mode (MySQL syntax)
+            stmt.execute("CREATE TABLE IF NOT EXISTS undo_log ("
+                    + "branch_id BIGINT NOT NULL, "
+                    + "xid VARCHAR(128) NOT NULL, "
+                    + "context VARCHAR(128) NOT NULL, "
+                    + "rollback_info LONGBLOB NOT NULL, "
+                    + "log_status INT NOT NULL, "
+                    + "log_created DATETIME(6) NOT NULL, "
+                    + "log_modified DATETIME(6) NOT NULL, "
+                    + "UNIQUE KEY ux_undo_log (xid, branch_id)"
+                    + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4");
+
+            // Insert test data
+            stmt.execute("TRUNCATE TABLE accounts");
+            try (PreparedStatement pstmt = conn.prepareStatement("INSERT INTO 
accounts (id, balance) VALUES (?, ?)")) {
+                for (int i = 1; i <= BenchmarkConstants.ACCOUNT_COUNT; i++) {
+                    pstmt.setLong(1, i);
+                    pstmt.setInt(2, BenchmarkConstants.INITIAL_BALANCE);
+                    pstmt.addBatch();
+                    if (i % 100 == 0) {
+                        pstmt.executeBatch();
+                    }
+                }
+                pstmt.executeBatch();
+            }
+

Review Comment:
   The last batch of account inserts may not be executed if the total count is 
not a multiple of 100. After the loop completes, executeBatch() is called at 
line 146, but this will only execute if there are remaining items from the last 
incomplete batch. However, looking more closely, the code does call 
executeBatch() after the loop, so this should work correctly. Still, the logic 
could be clearer - consider always executing the final batch outside the loop 
condition.
   ```suggestion
                       if (i % 100 == 0 || i == 
BenchmarkConstants.ACCOUNT_COUNT) {
                           pstmt.executeBatch();
                       }
                   }
               }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to