Copilot commented on code in PR #7865: URL: https://github.com/apache/incubator-seata/pull/7865#discussion_r2670748469
########## test-suite/seata-benchmark-cli/src/main/java/org/apache/seata/benchmark/executor/ATModeExecutor.java: ########## @@ -0,0 +1,249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.benchmark.executor; + +import com.zaxxer.hikari.HikariConfig; +import com.zaxxer.hikari.HikariDataSource; +import org.apache.seata.benchmark.config.BenchmarkConfig; +import org.apache.seata.benchmark.constant.BenchmarkConstants; +import org.apache.seata.rm.datasource.DataSourceProxy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.MySQLContainer; +import org.testcontainers.utility.DockerImageName; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.concurrent.ThreadLocalRandom; + +/** + * AT mode transaction executor supporting both empty and real transaction modes + * - branches == 0: Empty transaction mode (pure Seata protocol overhead testing) + * - branches > 0: Real mode with MySQL database (via Testcontainers) + */ +public class ATModeExecutor extends AbstractTransactionExecutor { + + private static final Logger LOGGER = LoggerFactory.getLogger(ATModeExecutor.class); + + private MySQLContainer<?> mysqlContainer; + private HikariDataSource rawDataSource; + private DataSourceProxy dataSourceProxy; + + public ATModeExecutor(BenchmarkConfig config) { + super(config); + } + + private boolean isRealMode() { + return config.getBranches() > 0; + } + + @Override + public void init() { + if (isRealMode()) { + LOGGER.info("Initializing AT mode executor (MySQL via Testcontainers)"); + initRealMode(); + } else { + LOGGER.info("AT mode executor initialized (empty transaction mode)"); + } + } + + private void initRealMode() { + // Start MySQL container + startMySQLContainer(); + + // Create HikariCP connection pool + createDataSource(); + + // Initialize database schema and data + initDatabase(); + + // Wrap with Seata DataSourceProxy for AT mode + dataSourceProxy = new DataSourceProxy(rawDataSource); + + LOGGER.info("DataSourceProxy initialized, dbType: {}", dataSourceProxy.getDbType()); + LOGGER.info("Real AT mode executor initialized with {} accounts", BenchmarkConstants.ACCOUNT_COUNT); + } + + private void startMySQLContainer() { + LOGGER.info("Starting MySQL container..."); + mysqlContainer = new MySQLContainer<>(DockerImageName.parse("mysql:8.0")) + .withDatabaseName("benchmark") + .withUsername("test") + .withPassword("test") + .withCommand("--character-set-server=utf8mb4", "--collation-server=utf8mb4_unicode_ci"); + + mysqlContainer.start(); + + LOGGER.info("MySQL container started: {}", mysqlContainer.getJdbcUrl()); + } + + private void createDataSource() { + HikariConfig hikariConfig = new HikariConfig(); + hikariConfig.setJdbcUrl(mysqlContainer.getJdbcUrl()); + hikariConfig.setUsername(mysqlContainer.getUsername()); + hikariConfig.setPassword(mysqlContainer.getPassword()); + hikariConfig.setDriverClassName("com.mysql.cj.jdbc.Driver"); + hikariConfig.setMaximumPoolSize(config.getThreads() * 2); + hikariConfig.setMinimumIdle(config.getThreads()); + hikariConfig.setConnectionTimeout(30000); + hikariConfig.setIdleTimeout(600000); + hikariConfig.setMaxLifetime(1800000); + + rawDataSource = new HikariDataSource(hikariConfig); + LOGGER.info("HikariCP DataSource created"); + } + + private void initDatabase() { + try (Connection conn = rawDataSource.getConnection(); + Statement stmt = conn.createStatement()) { + + // Create accounts table + stmt.execute("CREATE TABLE IF NOT EXISTS accounts (" + + "id BIGINT PRIMARY KEY, " + + "balance INT NOT NULL, " + + "updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4"); + + // Create undo_log table for Seata AT mode (MySQL syntax) + stmt.execute("CREATE TABLE IF NOT EXISTS undo_log (" + + "branch_id BIGINT NOT NULL, " + + "xid VARCHAR(128) NOT NULL, " + + "context VARCHAR(128) NOT NULL, " + + "rollback_info LONGBLOB NOT NULL, " + + "log_status INT NOT NULL, " + + "log_created DATETIME(6) NOT NULL, " + + "log_modified DATETIME(6) NOT NULL, " + + "UNIQUE KEY ux_undo_log (xid, branch_id)" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4"); + + // Insert test data + stmt.execute("TRUNCATE TABLE accounts"); + try (PreparedStatement pstmt = conn.prepareStatement("INSERT INTO accounts (id, balance) VALUES (?, ?)")) { + for (int i = 1; i <= BenchmarkConstants.ACCOUNT_COUNT; i++) { + pstmt.setLong(1, i); + pstmt.setInt(2, BenchmarkConstants.INITIAL_BALANCE); + pstmt.addBatch(); + if (i % 100 == 0) { + pstmt.executeBatch(); + } + } + pstmt.executeBatch(); + } + + LOGGER.info( + "Database initialized: {} accounts with balance {}", + BenchmarkConstants.ACCOUNT_COUNT, + BenchmarkConstants.INITIAL_BALANCE); + + } catch (SQLException e) { + throw new RuntimeException("Failed to initialize database", e); + } + } + + @Override + protected String getTransactionName() { + return isRealMode() ? "benchmark-real-at-tx" : "benchmark-at-tx"; + } + + @Override + protected int getBranchCount() { + return config.getBranches(); + } + + @Override + protected void executeBusinessLogic() throws Exception { + if (isRealMode()) { + executeBranchOperations(config.getBranches()); + } + // Empty mode: do nothing (pure Seata protocol overhead testing) + } + + private void executeBranchOperations(int branchCount) throws SQLException { + // Execute N branch operations (simulating distributed transaction branches) + for (int i = 0; i < branchCount; i++) { + executeSingleBranch(); + } + } + + @SuppressWarnings("lgtm[java/insecure-randomness]") + private void executeSingleBranch() throws SQLException { + // Use DataSourceProxy connection to enable AT mode + try (Connection conn = dataSourceProxy.getConnection()) { + conn.setAutoCommit(false); + + // Transfer between two random accounts + long fromAccount = (ThreadLocalRandom.current().nextInt(BenchmarkConstants.ACCOUNT_COUNT) + 1); + long toAccount = (ThreadLocalRandom.current().nextInt(BenchmarkConstants.ACCOUNT_COUNT) + 1); + while (toAccount == fromAccount) { + toAccount = (ThreadLocalRandom.current().nextInt(BenchmarkConstants.ACCOUNT_COUNT) + 1); + } + int amount = ThreadLocalRandom.current().nextInt(BenchmarkConstants.MAX_TRANSFER_AMOUNT) + + BenchmarkConstants.MIN_TRANSFER_AMOUNT; + + // Debit from source account + try (PreparedStatement pstmt = + conn.prepareStatement("UPDATE accounts SET balance = balance - ? WHERE id = ?")) { + pstmt.setInt(1, amount); + pstmt.setLong(2, fromAccount); + pstmt.executeUpdate(); Review Comment: The account balance updates don't check for sufficient balance before debiting, which could result in negative balances. While this is a benchmark tool and not production code, it may lead to unrealistic test scenarios and could mask potential issues with real AT mode transactions. Consider adding a balance check or initializing accounts with sufficiently high balances to prevent this. ```suggestion // Debit from source account, ensuring sufficient balance to avoid negative values try (PreparedStatement pstmt = conn.prepareStatement("UPDATE accounts SET balance = balance - ? WHERE id = ? AND balance >= ?")) { pstmt.setInt(1, amount); pstmt.setLong(2, fromAccount); pstmt.setInt(3, amount); int updatedRows = pstmt.executeUpdate(); // If no row was updated, there was insufficient balance; roll back and skip this transfer if (updatedRows == 0) { conn.rollback(); return; } ``` ########## test-suite/seata-benchmark-cli/src/main/java/org/apache/seata/benchmark/saga/BenchmarkServiceInvoker.java: ########## @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.benchmark.saga; + +import org.apache.seata.saga.engine.invoker.ServiceInvoker; +import org.apache.seata.saga.statelang.domain.ServiceTaskState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.Method; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Service invoker for benchmark Saga services. + * Invokes registered service methods based on service name and method name. + */ +public class BenchmarkServiceInvoker implements ServiceInvoker { + + private static final Logger LOGGER = LoggerFactory.getLogger(BenchmarkServiceInvoker.class); + + private final Map<String, Object> serviceRegistry = new HashMap<>(); + + public void registerService(String serviceName, Object service) { + serviceRegistry.put(serviceName, service); + LOGGER.debug("Registered service: {}", serviceName); + } + + @Override + public Object invoke(ServiceTaskState serviceTaskState, Object... input) throws Exception { + String serviceName = serviceTaskState.getServiceName(); + String methodName = serviceTaskState.getServiceMethod(); + + Object service = serviceRegistry.get(serviceName); + if (service == null) { + throw new IllegalArgumentException("Service not found: " + serviceName); + } + + // Find the method + Method method = findMethod(service.getClass(), methodName); + if (method == null) { + throw new NoSuchMethodException("Method not found: " + methodName + " in service: " + serviceName); + } + + // Prepare input parameters + Object inputParam = prepareInput(input); + + LOGGER.debug("Invoking service: {}.{}()", serviceName, methodName); + + // Invoke the method + return method.invoke(service, inputParam); + } + + private Method findMethod(Class<?> clazz, String methodName) { + for (Method method : clazz.getMethods()) { + if (method.getName().equals(methodName)) { + return method; + } + } + return null; + } Review Comment: The findMethod implementation returns the first method matching the name without checking parameter types or count. If a class has overloaded methods with the same name, this will return an arbitrary match which may not be the intended method. Consider checking parameter compatibility or documenting that overloaded methods are not supported. ########## test-suite/seata-benchmark-cli/src/main/java/org/apache/seata/benchmark/config/BenchmarkConfigLoader.java: ########## @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.benchmark.config; + +import org.apache.seata.common.util.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.yaml.snakeyaml.LoaderOptions; +import org.yaml.snakeyaml.Yaml; +import org.yaml.snakeyaml.constructor.Constructor; + +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; + +/** + * Configuration loader for benchmark + */ +public class BenchmarkConfigLoader { + + private static final Logger LOGGER = LoggerFactory.getLogger(BenchmarkConfigLoader.class); + private static final String DEFAULT_CONFIG_FILE = "benchmark.yaml"; + private static final String ENV_CONFIG_FILE = "BENCHMARK_CONFIG_FILE"; + private static final String SYS_CONFIG_FILE = "benchmark.config.file"; + + public static BenchmarkConfig load(String configFile) throws IOException { + BenchmarkConfig config; + String configSource; + + if (configFile != null && !configFile.isEmpty()) { + config = loadFromFile(configFile); + configSource = "CLI argument: " + configFile; + } else { + String envConfig = System.getenv(ENV_CONFIG_FILE); + if (envConfig != null && !envConfig.isEmpty()) { + config = loadFromFile(envConfig); + configSource = "Environment variable " + ENV_CONFIG_FILE + ": " + envConfig; + } else { + String sysConfig = System.getProperty(SYS_CONFIG_FILE); + if (sysConfig != null && !sysConfig.isEmpty()) { + config = loadFromFile(sysConfig); + configSource = "System property " + SYS_CONFIG_FILE + ": " + sysConfig; + } else { + config = loadDefault(); + configSource = "Default classpath: " + DEFAULT_CONFIG_FILE; + } + } + } + + LOGGER.info("Loading configuration from: {}", configSource); + config.validate(); + return config; + } + + private static BenchmarkConfig loadFromFile(String filePath) throws IOException { + try (FileInputStream fis = new FileInputStream(filePath)) { + return loadFromStream(fis); + } + } + + public static BenchmarkConfig loadDefault() throws IOException { + try (InputStream is = BenchmarkConfigLoader.class.getClassLoader().getResourceAsStream(DEFAULT_CONFIG_FILE)) { + if (is == null) { + LOGGER.warn("Default configuration file not found, using built-in defaults"); + return new BenchmarkConfig(); + } + return loadFromStream(is); + } + } + + private static BenchmarkConfig loadFromStream(InputStream inputStream) { + LoaderOptions loaderOptions = new LoaderOptions(); + Constructor constructor = new Constructor(BenchmarkConfig.class, loaderOptions); + Yaml yaml = new Yaml(constructor); + return yaml.load(inputStream); Review Comment: The SnakeYAML library should be configured with SafeConstructor instead of Constructor to prevent arbitrary code execution via YAML deserialization. This is a known security vulnerability (CVE-2022-1471 and related). Use SafeConstructor or configure LoaderOptions to restrict allowed types. ########## test-suite/seata-benchmark-cli/src/main/java/org/apache/seata/benchmark/executor/SagaModeExecutor.java: ########## @@ -0,0 +1,310 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.benchmark.executor; + +import org.apache.seata.benchmark.config.BenchmarkConfig; +import org.apache.seata.benchmark.model.TransactionRecord; +import org.apache.seata.benchmark.saga.BenchmarkServiceInvoker; +import org.apache.seata.benchmark.saga.InventorySagaService; +import org.apache.seata.benchmark.saga.OrderSagaService; +import org.apache.seata.benchmark.saga.PaymentSagaService; +import org.apache.seata.benchmark.saga.SimpleSpelExpressionFactory; +import org.apache.seata.core.exception.TransactionException; +import org.apache.seata.core.model.GlobalStatus; +import org.apache.seata.saga.engine.StateMachineEngine; +import org.apache.seata.saga.engine.config.AbstractStateMachineConfig; +import org.apache.seata.saga.engine.expression.ExpressionFactoryManager; +import org.apache.seata.saga.engine.impl.ProcessCtrlStateMachineEngine; +import org.apache.seata.saga.statelang.domain.DomainConstants; +import org.apache.seata.saga.statelang.domain.ExecutionStatus; +import org.apache.seata.saga.statelang.domain.StateMachineInstance; +import org.apache.seata.tm.api.GlobalTransaction; +import org.apache.seata.tm.api.GlobalTransactionContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.InputStream; +import java.math.BigDecimal; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicLong; + +import static org.apache.seata.benchmark.constant.BenchmarkConstants.STATUS_COMMITTED; +import static org.apache.seata.benchmark.constant.BenchmarkConstants.STATUS_COMPENSATED; +import static org.apache.seata.benchmark.constant.BenchmarkConstants.STATUS_COMPENSATION_FAILED; +import static org.apache.seata.benchmark.constant.BenchmarkConstants.STATUS_FAILED; +import static org.apache.seata.benchmark.constant.BenchmarkConstants.STATUS_UNKNOWN; + +/** + * Saga mode transaction executor supporting both mock and real modes + * - branches == 0: Mock mode (simplified Saga simulation without state machine) + * - branches > 0: Real mode (state machine engine with compensation support) + */ +public class SagaModeExecutor implements TransactionExecutor { + + private static final Logger LOGGER = LoggerFactory.getLogger(SagaModeExecutor.class); + private static final AtomicLong BUSINESS_KEY_COUNTER = new AtomicLong(0); + + private static final String SIMPLE_SAGA_NAME = "benchmarkSimpleSaga"; + private static final String ORDER_SAGA_NAME = "benchmarkOrderSaga"; + + private final BenchmarkConfig config; + private StateMachineEngine stateMachineEngine; + private BenchmarkStateMachineConfig stateMachineConfig; + + public SagaModeExecutor(BenchmarkConfig config) { + this.config = config; + } + + private boolean isRealMode() { + return config.getBranches() > 0; + } + + @Override + public void init() { + if (isRealMode()) { + initRealMode(); + } else { + LOGGER.info("Saga mode executor initialized (simplified mock mode)"); + LOGGER.info("Note: Full Saga annotation support requires Spring framework integration"); + } + } + + private void initRealMode() { + LOGGER.info("Initializing Real Saga mode executor with state machine engine"); + + try { + // Create and configure state machine config + stateMachineConfig = new BenchmarkStateMachineConfig(); + stateMachineConfig.setRollbackPercentage(config.getRollbackPercentage()); + stateMachineConfig.init(); + + // Create state machine engine + ProcessCtrlStateMachineEngine engine = new ProcessCtrlStateMachineEngine(); + engine.setStateMachineConfig(stateMachineConfig); + this.stateMachineEngine = engine; + + LOGGER.info("Real Saga mode executor initialized"); + LOGGER.info("Available state machines: {}, {}", SIMPLE_SAGA_NAME, ORDER_SAGA_NAME); + + } catch (Exception e) { + throw new RuntimeException("Failed to initialize Saga state machine engine", e); + } + } + + @Override + public TransactionRecord execute() { + if (isRealMode()) { + return executeRealMode(); + } else { + return executeMockMode(); + } + } + + private TransactionRecord executeMockMode() { + GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate(); + long startTime = System.currentTimeMillis(); + String xid = null; + String status = STATUS_UNKNOWN; + int branchCount = config.getBranches(); + boolean success = false; + + try { + tx.begin(30000, "benchmark-saga-tx"); + xid = tx.getXid(); + + // Simulate Saga actions (forward phase) + for (int i = 0; i < branchCount; i++) { + simulateSagaAction(i); + } + + if (shouldRollback()) { + // Simulate compensation (backward phase) + for (int i = branchCount - 1; i >= 0; i--) { + simulateCompensation(i); + } + tx.rollback(); + status = STATUS_COMPENSATED; + } else { + tx.commit(); + status = STATUS_COMMITTED; + success = true; + } + + } catch (TransactionException e) { + LOGGER.debug("Transaction failed: {}", e.getMessage()); + status = STATUS_FAILED; + try { + if (tx.getStatus() != GlobalStatus.Rollbacked && tx.getStatus() != GlobalStatus.RollbackFailed) { + tx.rollback(); + } + } catch (TransactionException rollbackEx) { + LOGGER.debug("Rollback failed: {}", rollbackEx.getMessage()); + } + } + + long duration = System.currentTimeMillis() - startTime; + return new TransactionRecord(xid, status, duration, branchCount, success); + } + + private TransactionRecord executeRealMode() { + long startTime = System.currentTimeMillis(); + String businessKey = "benchmark-" + BUSINESS_KEY_COUNTER.incrementAndGet(); + String status = STATUS_UNKNOWN; + int branchCount = config.getBranches(); + boolean success = false; + + try { + // Choose state machine based on branch count + String stateMachineName = branchCount >= 3 ? ORDER_SAGA_NAME : SIMPLE_SAGA_NAME; + + // Prepare start parameters + Map<String, Object> startParams = createStartParams(); + + // Execute state machine + StateMachineInstance instance = stateMachineEngine.startWithBusinessKey( + stateMachineName, stateMachineConfig.getDefaultTenantId(), businessKey, startParams); + + // Check execution result + ExecutionStatus executionStatus = instance.getStatus(); + ExecutionStatus compensationStatus = instance.getCompensationStatus(); + + if (ExecutionStatus.SU.equals(executionStatus)) { + status = STATUS_COMMITTED; + success = true; + } else if (ExecutionStatus.FA.equals(executionStatus)) { + if (compensationStatus != null) { + if (ExecutionStatus.SU.equals(compensationStatus)) { + status = STATUS_COMPENSATED; + } else { + status = STATUS_COMPENSATION_FAILED; + } + } else { + status = STATUS_FAILED; + } + } else if (ExecutionStatus.UN.equals(executionStatus)) { + status = STATUS_UNKNOWN; + } else { + status = executionStatus != null ? executionStatus.name() : STATUS_UNKNOWN; + } + + } catch (Exception e) { + LOGGER.debug("Saga execution failed: {}", e.getMessage()); + status = STATUS_FAILED; + } + + long duration = System.currentTimeMillis() - startTime; + return new TransactionRecord(businessKey, status, duration, branchCount, success); + } + + @SuppressWarnings("lgtm[java/insecure-randomness]") + private Map<String, Object> createStartParams() { + Map<String, Object> params = new HashMap<>(); + params.put("userId", "user-" + ThreadLocalRandom.current().nextInt(1000)); + params.put("productId", "product-" + ThreadLocalRandom.current().nextInt(100)); + params.put("quantity", ThreadLocalRandom.current().nextInt(10) + 1); + params.put("amount", new BigDecimal(ThreadLocalRandom.current().nextInt(1000) + 100)); + params.put("accountId", "account-" + ThreadLocalRandom.current().nextInt(1000)); + return params; + } + + private void simulateSagaAction(int branchId) { + // Simulated Saga forward action + // In real implementation, this would be a @CompensationBusinessAction annotated method + LOGGER.trace("Executing Saga action for branch {}", branchId); + } + + private void simulateCompensation(int branchId) { + // Simulated Saga compensation action + // In real implementation, this would be the compensationMethod + LOGGER.trace("Executing compensation for branch {}", branchId); + } + + @SuppressWarnings("lgtm[java/insecure-randomness]") + private boolean shouldRollback() { + return ThreadLocalRandom.current().nextInt(100) < config.getRollbackPercentage(); + } + + @Override + public void destroy() { + if (isRealMode()) { + destroyRealMode(); + } + LOGGER.info("Saga mode executor destroyed"); + } + + private void destroyRealMode() { + LOGGER.info("Destroying Real Saga mode resources"); + // StateMachineEngine doesn't have a close method + stateMachineEngine = null; + stateMachineConfig = null; + } + + /** + * Custom StateMachineConfig for benchmark testing. + */ + private static class BenchmarkStateMachineConfig extends AbstractStateMachineConfig { + + private int rollbackPercentage = 0; + + public void setRollbackPercentage(int rollbackPercentage) { + this.rollbackPercentage = rollbackPercentage; + } + + @Override + public void init() throws Exception { + // Load state machine definitions from classpath + InputStream simpleSagaStream = + getClass().getClassLoader().getResourceAsStream("seata/saga/statelang/benchmark_simple_saga.json"); + InputStream orderSagaStream = + getClass().getClassLoader().getResourceAsStream("seata/saga/statelang/benchmark_order_saga.json"); + + if (simpleSagaStream == null || orderSagaStream == null) { + throw new RuntimeException("Failed to load Saga state machine definitions from classpath"); + } + + setStateMachineDefInputStreamArray(new InputStream[] {simpleSagaStream, orderSagaStream}); + + // Initialize parent config + super.init(); + + // Register SpEL expression factory for parameter evaluation + ExpressionFactoryManager expressionFactoryManager = getExpressionFactoryManager(); + SimpleSpelExpressionFactory spelExpressionFactory = new SimpleSpelExpressionFactory(); + // Register for default type (when expression doesn't start with $) + expressionFactoryManager.putExpressionFactory( + ExpressionFactoryManager.DEFAULT_EXPRESSION_TYPE, spelExpressionFactory); + // Register for empty type (when expression is like $.xxx where type is empty string) + expressionFactoryManager.putExpressionFactory("", spelExpressionFactory); + + // Register benchmark services with the service invoker manager + BenchmarkServiceInvoker serviceInvoker = new BenchmarkServiceInvoker(); + + // Register services with configured rollback percentage + // Divide rollback percentage by 3 for each service so total probability is approximately correct + int serviceRollbackPct = Math.max(1, rollbackPercentage / 3); + + serviceInvoker.registerService("orderService", new OrderSagaService(serviceRollbackPct, 5)); + serviceInvoker.registerService("inventoryService", new InventorySagaService(serviceRollbackPct, 5)); + serviceInvoker.registerService("paymentService", new PaymentSagaService(serviceRollbackPct, 5)); Review Comment: The rollback percentage distribution logic divides by 3 and uses Math.max(1, ...), which means even when rollbackPercentage is 0, each service will have a 1% failure rate. This changes the expected behavior where 0 should mean no failures. Consider using Math.max(0, rollbackPercentage / 3) or skipping the division entirely if rollbackPercentage is 0. ########## test-suite/seata-benchmark-cli/src/main/java/org/apache/seata/benchmark/executor/SagaModeExecutor.java: ########## @@ -0,0 +1,310 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.benchmark.executor; + +import org.apache.seata.benchmark.config.BenchmarkConfig; +import org.apache.seata.benchmark.model.TransactionRecord; +import org.apache.seata.benchmark.saga.BenchmarkServiceInvoker; +import org.apache.seata.benchmark.saga.InventorySagaService; +import org.apache.seata.benchmark.saga.OrderSagaService; +import org.apache.seata.benchmark.saga.PaymentSagaService; +import org.apache.seata.benchmark.saga.SimpleSpelExpressionFactory; +import org.apache.seata.core.exception.TransactionException; +import org.apache.seata.core.model.GlobalStatus; +import org.apache.seata.saga.engine.StateMachineEngine; +import org.apache.seata.saga.engine.config.AbstractStateMachineConfig; +import org.apache.seata.saga.engine.expression.ExpressionFactoryManager; +import org.apache.seata.saga.engine.impl.ProcessCtrlStateMachineEngine; +import org.apache.seata.saga.statelang.domain.DomainConstants; +import org.apache.seata.saga.statelang.domain.ExecutionStatus; +import org.apache.seata.saga.statelang.domain.StateMachineInstance; +import org.apache.seata.tm.api.GlobalTransaction; +import org.apache.seata.tm.api.GlobalTransactionContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.InputStream; +import java.math.BigDecimal; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicLong; + +import static org.apache.seata.benchmark.constant.BenchmarkConstants.STATUS_COMMITTED; +import static org.apache.seata.benchmark.constant.BenchmarkConstants.STATUS_COMPENSATED; +import static org.apache.seata.benchmark.constant.BenchmarkConstants.STATUS_COMPENSATION_FAILED; +import static org.apache.seata.benchmark.constant.BenchmarkConstants.STATUS_FAILED; +import static org.apache.seata.benchmark.constant.BenchmarkConstants.STATUS_UNKNOWN; + +/** + * Saga mode transaction executor supporting both mock and real modes + * - branches == 0: Mock mode (simplified Saga simulation without state machine) + * - branches > 0: Real mode (state machine engine with compensation support) + */ +public class SagaModeExecutor implements TransactionExecutor { + + private static final Logger LOGGER = LoggerFactory.getLogger(SagaModeExecutor.class); + private static final AtomicLong BUSINESS_KEY_COUNTER = new AtomicLong(0); + + private static final String SIMPLE_SAGA_NAME = "benchmarkSimpleSaga"; + private static final String ORDER_SAGA_NAME = "benchmarkOrderSaga"; + + private final BenchmarkConfig config; + private StateMachineEngine stateMachineEngine; + private BenchmarkStateMachineConfig stateMachineConfig; + + public SagaModeExecutor(BenchmarkConfig config) { + this.config = config; + } + + private boolean isRealMode() { + return config.getBranches() > 0; + } + + @Override + public void init() { + if (isRealMode()) { + initRealMode(); + } else { + LOGGER.info("Saga mode executor initialized (simplified mock mode)"); + LOGGER.info("Note: Full Saga annotation support requires Spring framework integration"); + } + } + + private void initRealMode() { + LOGGER.info("Initializing Real Saga mode executor with state machine engine"); + + try { + // Create and configure state machine config + stateMachineConfig = new BenchmarkStateMachineConfig(); + stateMachineConfig.setRollbackPercentage(config.getRollbackPercentage()); + stateMachineConfig.init(); + + // Create state machine engine + ProcessCtrlStateMachineEngine engine = new ProcessCtrlStateMachineEngine(); + engine.setStateMachineConfig(stateMachineConfig); + this.stateMachineEngine = engine; + + LOGGER.info("Real Saga mode executor initialized"); + LOGGER.info("Available state machines: {}, {}", SIMPLE_SAGA_NAME, ORDER_SAGA_NAME); + + } catch (Exception e) { + throw new RuntimeException("Failed to initialize Saga state machine engine", e); + } + } + + @Override + public TransactionRecord execute() { + if (isRealMode()) { + return executeRealMode(); + } else { + return executeMockMode(); + } + } + + private TransactionRecord executeMockMode() { + GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate(); + long startTime = System.currentTimeMillis(); + String xid = null; + String status = STATUS_UNKNOWN; + int branchCount = config.getBranches(); + boolean success = false; + + try { + tx.begin(30000, "benchmark-saga-tx"); + xid = tx.getXid(); + + // Simulate Saga actions (forward phase) + for (int i = 0; i < branchCount; i++) { + simulateSagaAction(i); + } + + if (shouldRollback()) { + // Simulate compensation (backward phase) + for (int i = branchCount - 1; i >= 0; i--) { + simulateCompensation(i); + } + tx.rollback(); + status = STATUS_COMPENSATED; + } else { + tx.commit(); + status = STATUS_COMMITTED; + success = true; + } + + } catch (TransactionException e) { + LOGGER.debug("Transaction failed: {}", e.getMessage()); + status = STATUS_FAILED; + try { + if (tx.getStatus() != GlobalStatus.Rollbacked && tx.getStatus() != GlobalStatus.RollbackFailed) { + tx.rollback(); + } + } catch (TransactionException rollbackEx) { + LOGGER.debug("Rollback failed: {}", rollbackEx.getMessage()); + } + } + + long duration = System.currentTimeMillis() - startTime; + return new TransactionRecord(xid, status, duration, branchCount, success); + } + + private TransactionRecord executeRealMode() { + long startTime = System.currentTimeMillis(); + String businessKey = "benchmark-" + BUSINESS_KEY_COUNTER.incrementAndGet(); + String status = STATUS_UNKNOWN; + int branchCount = config.getBranches(); + boolean success = false; + + try { + // Choose state machine based on branch count + String stateMachineName = branchCount >= 3 ? ORDER_SAGA_NAME : SIMPLE_SAGA_NAME; + + // Prepare start parameters + Map<String, Object> startParams = createStartParams(); + + // Execute state machine + StateMachineInstance instance = stateMachineEngine.startWithBusinessKey( + stateMachineName, stateMachineConfig.getDefaultTenantId(), businessKey, startParams); + + // Check execution result + ExecutionStatus executionStatus = instance.getStatus(); + ExecutionStatus compensationStatus = instance.getCompensationStatus(); + + if (ExecutionStatus.SU.equals(executionStatus)) { + status = STATUS_COMMITTED; + success = true; + } else if (ExecutionStatus.FA.equals(executionStatus)) { + if (compensationStatus != null) { + if (ExecutionStatus.SU.equals(compensationStatus)) { + status = STATUS_COMPENSATED; + } else { + status = STATUS_COMPENSATION_FAILED; + } + } else { + status = STATUS_FAILED; + } + } else if (ExecutionStatus.UN.equals(executionStatus)) { + status = STATUS_UNKNOWN; + } else { + status = executionStatus != null ? executionStatus.name() : STATUS_UNKNOWN; + } + + } catch (Exception e) { + LOGGER.debug("Saga execution failed: {}", e.getMessage()); + status = STATUS_FAILED; + } + + long duration = System.currentTimeMillis() - startTime; + return new TransactionRecord(businessKey, status, duration, branchCount, success); + } + + @SuppressWarnings("lgtm[java/insecure-randomness]") + private Map<String, Object> createStartParams() { + Map<String, Object> params = new HashMap<>(); + params.put("userId", "user-" + ThreadLocalRandom.current().nextInt(1000)); + params.put("productId", "product-" + ThreadLocalRandom.current().nextInt(100)); + params.put("quantity", ThreadLocalRandom.current().nextInt(10) + 1); + params.put("amount", new BigDecimal(ThreadLocalRandom.current().nextInt(1000) + 100)); + params.put("accountId", "account-" + ThreadLocalRandom.current().nextInt(1000)); + return params; + } + + private void simulateSagaAction(int branchId) { + // Simulated Saga forward action + // In real implementation, this would be a @CompensationBusinessAction annotated method + LOGGER.trace("Executing Saga action for branch {}", branchId); + } + + private void simulateCompensation(int branchId) { + // Simulated Saga compensation action + // In real implementation, this would be the compensationMethod + LOGGER.trace("Executing compensation for branch {}", branchId); + } + + @SuppressWarnings("lgtm[java/insecure-randomness]") + private boolean shouldRollback() { + return ThreadLocalRandom.current().nextInt(100) < config.getRollbackPercentage(); + } + + @Override + public void destroy() { + if (isRealMode()) { + destroyRealMode(); + } + LOGGER.info("Saga mode executor destroyed"); + } + + private void destroyRealMode() { + LOGGER.info("Destroying Real Saga mode resources"); + // StateMachineEngine doesn't have a close method + stateMachineEngine = null; + stateMachineConfig = null; + } + + /** + * Custom StateMachineConfig for benchmark testing. + */ + private static class BenchmarkStateMachineConfig extends AbstractStateMachineConfig { + + private int rollbackPercentage = 0; + + public void setRollbackPercentage(int rollbackPercentage) { + this.rollbackPercentage = rollbackPercentage; + } + + @Override + public void init() throws Exception { + // Load state machine definitions from classpath + InputStream simpleSagaStream = + getClass().getClassLoader().getResourceAsStream("seata/saga/statelang/benchmark_simple_saga.json"); + InputStream orderSagaStream = + getClass().getClassLoader().getResourceAsStream("seata/saga/statelang/benchmark_order_saga.json"); + + if (simpleSagaStream == null || orderSagaStream == null) { + throw new RuntimeException("Failed to load Saga state machine definitions from classpath"); + } + + setStateMachineDefInputStreamArray(new InputStream[] {simpleSagaStream, orderSagaStream}); + + // Initialize parent config + super.init(); + + // Register SpEL expression factory for parameter evaluation + ExpressionFactoryManager expressionFactoryManager = getExpressionFactoryManager(); + SimpleSpelExpressionFactory spelExpressionFactory = new SimpleSpelExpressionFactory(); + // Register for default type (when expression doesn't start with $) + expressionFactoryManager.putExpressionFactory( + ExpressionFactoryManager.DEFAULT_EXPRESSION_TYPE, spelExpressionFactory); + // Register for empty type (when expression is like $.xxx where type is empty string) + expressionFactoryManager.putExpressionFactory("", spelExpressionFactory); + + // Register benchmark services with the service invoker manager + BenchmarkServiceInvoker serviceInvoker = new BenchmarkServiceInvoker(); + + // Register services with configured rollback percentage + // Divide rollback percentage by 3 for each service so total probability is approximately correct + int serviceRollbackPct = Math.max(1, rollbackPercentage / 3); + + serviceInvoker.registerService("orderService", new OrderSagaService(serviceRollbackPct, 5)); + serviceInvoker.registerService("inventoryService", new InventorySagaService(serviceRollbackPct, 5)); + serviceInvoker.registerService("paymentService", new PaymentSagaService(serviceRollbackPct, 5)); + + // Register the service invoker for different service types + getServiceInvokerManager().putServiceInvoker(DomainConstants.SERVICE_TYPE_SPRING_BEAN, serviceInvoker); Review Comment: The InputStreams loaded from the classpath are never explicitly closed. While they are passed to setStateMachineDefInputStreamArray and may be closed internally, it's not guaranteed. Consider wrapping the resource loading in a try-with-resources block or documenting that the parent class handles stream closure. ```suggestion try (InputStream simpleSagaStream = getClass().getClassLoader().getResourceAsStream("seata/saga/statelang/benchmark_simple_saga.json"); InputStream orderSagaStream = getClass().getClassLoader().getResourceAsStream("seata/saga/statelang/benchmark_order_saga.json")) { if (simpleSagaStream == null || orderSagaStream == null) { throw new RuntimeException("Failed to load Saga state machine definitions from classpath"); } setStateMachineDefInputStreamArray(new InputStream[] {simpleSagaStream, orderSagaStream}); // Initialize parent config super.init(); // Register SpEL expression factory for parameter evaluation ExpressionFactoryManager expressionFactoryManager = getExpressionFactoryManager(); SimpleSpelExpressionFactory spelExpressionFactory = new SimpleSpelExpressionFactory(); // Register for default type (when expression doesn't start with $) expressionFactoryManager.putExpressionFactory( ExpressionFactoryManager.DEFAULT_EXPRESSION_TYPE, spelExpressionFactory); // Register for empty type (when expression is like $.xxx where type is empty string) expressionFactoryManager.putExpressionFactory("", spelExpressionFactory); // Register benchmark services with the service invoker manager BenchmarkServiceInvoker serviceInvoker = new BenchmarkServiceInvoker(); // Register services with configured rollback percentage // Divide rollback percentage by 3 for each service so total probability is approximately correct int serviceRollbackPct = Math.max(1, rollbackPercentage / 3); serviceInvoker.registerService("orderService", new OrderSagaService(serviceRollbackPct, 5)); serviceInvoker.registerService("inventoryService", new InventorySagaService(serviceRollbackPct, 5)); serviceInvoker.registerService("paymentService", new PaymentSagaService(serviceRollbackPct, 5)); // Register the service invoker for different service types getServiceInvokerManager().putServiceInvoker(DomainConstants.SERVICE_TYPE_SPRING_BEAN, serviceInvoker); } ``` ########## test-suite/seata-benchmark-cli/src/main/java/org/apache/seata/benchmark/model/BenchmarkMetrics.java: ########## @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.benchmark.model; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Benchmark performance metrics + */ +public class BenchmarkMetrics { + + private final AtomicLong totalCount = new AtomicLong(0); + private final AtomicLong successCount = new AtomicLong(0); + private final AtomicLong failedCount = new AtomicLong(0); + private final List<Long> latencies = Collections.synchronizedList(new ArrayList<>()); + private final long startTime = System.currentTimeMillis(); + + private volatile long lastCountSnapshot = 0; + private volatile long lastSnapshotTime = System.currentTimeMillis(); + + public void recordSuccess(long latencyMs) { + totalCount.incrementAndGet(); + successCount.incrementAndGet(); + latencies.add(latencyMs); + } + + public void recordFailure(long latencyMs) { + totalCount.incrementAndGet(); + failedCount.incrementAndGet(); + latencies.add(latencyMs); + } + + public long getTotalCount() { + return totalCount.get(); + } + + public long getSuccessCount() { + return successCount.get(); + } + + public long getFailedCount() { + return failedCount.get(); + } + + public double getSuccessRate() { + long total = totalCount.get(); + if (total == 0) { + return 0.0; + } + return (double) successCount.get() / total * 100; + } + + public double getAverageTps() { + long elapsed = System.currentTimeMillis() - startTime; + if (elapsed == 0) { + return 0.0; + } + return (double) totalCount.get() / elapsed * 1000; + } + + public synchronized double getCurrentTps() { + long currentTime = System.currentTimeMillis(); + long currentCount = totalCount.get(); + long elapsed = currentTime - lastSnapshotTime; + + if (elapsed == 0) { + return 0.0; + } + + double tps = (double) (currentCount - lastCountSnapshot) / elapsed * 1000; + lastSnapshotTime = currentTime; + lastCountSnapshot = currentCount; + return tps; + } + + public long getElapsedTimeSeconds() { + return (System.currentTimeMillis() - startTime) / 1000; + } + + public LatencyStats getLatencyStats() { + if (latencies.isEmpty()) { + return new LatencyStats(0, 0, 0, 0); + } + + List<Long> sortedLatencies = new ArrayList<>(latencies); + Collections.sort(sortedLatencies); + + int size = sortedLatencies.size(); + long p50 = sortedLatencies.get(Math.max(0, (int) Math.ceil(size * 0.5) - 1)); + long p95 = sortedLatencies.get(Math.max(0, (int) Math.ceil(size * 0.95) - 1)); + long p99 = sortedLatencies.get(Math.max(0, (int) Math.ceil(size * 0.99) - 1)); + long max = sortedLatencies.get(size - 1); + + return new LatencyStats(p50, p95, p99, max); + } Review Comment: The getLatencyStats method creates a full copy of the latencies list and sorts it on every call. This is inefficient and can cause performance issues and memory pressure, especially with large datasets. Consider using a streaming percentile algorithm (like t-digest) or caching the sorted results and invalidating on updates. ########## test-suite/seata-benchmark-cli/src/main/java/org/apache/seata/benchmark/model/BenchmarkMetrics.java: ########## @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.benchmark.model; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Benchmark performance metrics + */ +public class BenchmarkMetrics { + + private final AtomicLong totalCount = new AtomicLong(0); + private final AtomicLong successCount = new AtomicLong(0); + private final AtomicLong failedCount = new AtomicLong(0); + private final List<Long> latencies = Collections.synchronizedList(new ArrayList<>()); + private final long startTime = System.currentTimeMillis(); + + private volatile long lastCountSnapshot = 0; + private volatile long lastSnapshotTime = System.currentTimeMillis(); + + public void recordSuccess(long latencyMs) { + totalCount.incrementAndGet(); + successCount.incrementAndGet(); + latencies.add(latencyMs); + } + + public void recordFailure(long latencyMs) { + totalCount.incrementAndGet(); + failedCount.incrementAndGet(); + latencies.add(latencyMs); Review Comment: The latencies list has unbounded growth which can cause OutOfMemoryError on long-running or high-volume benchmarks. The PR description mentions "latency sampling" with a max of 500K samples to prevent OOM, but this implementation doesn't enforce any limit. Consider implementing a sampling strategy or bounded collection as described in the design notes. ########## test-suite/seata-benchmark-cli/src/main/java/org/apache/seata/benchmark/executor/ATModeExecutor.java: ########## @@ -0,0 +1,249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.benchmark.executor; + +import com.zaxxer.hikari.HikariConfig; +import com.zaxxer.hikari.HikariDataSource; +import org.apache.seata.benchmark.config.BenchmarkConfig; +import org.apache.seata.benchmark.constant.BenchmarkConstants; +import org.apache.seata.rm.datasource.DataSourceProxy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.MySQLContainer; +import org.testcontainers.utility.DockerImageName; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.concurrent.ThreadLocalRandom; + +/** + * AT mode transaction executor supporting both empty and real transaction modes + * - branches == 0: Empty transaction mode (pure Seata protocol overhead testing) + * - branches > 0: Real mode with MySQL database (via Testcontainers) + */ +public class ATModeExecutor extends AbstractTransactionExecutor { + + private static final Logger LOGGER = LoggerFactory.getLogger(ATModeExecutor.class); + + private MySQLContainer<?> mysqlContainer; + private HikariDataSource rawDataSource; + private DataSourceProxy dataSourceProxy; + + public ATModeExecutor(BenchmarkConfig config) { + super(config); + } + + private boolean isRealMode() { + return config.getBranches() > 0; + } + + @Override + public void init() { + if (isRealMode()) { + LOGGER.info("Initializing AT mode executor (MySQL via Testcontainers)"); + initRealMode(); + } else { + LOGGER.info("AT mode executor initialized (empty transaction mode)"); + } + } + + private void initRealMode() { + // Start MySQL container + startMySQLContainer(); + + // Create HikariCP connection pool + createDataSource(); + + // Initialize database schema and data + initDatabase(); + + // Wrap with Seata DataSourceProxy for AT mode + dataSourceProxy = new DataSourceProxy(rawDataSource); + + LOGGER.info("DataSourceProxy initialized, dbType: {}", dataSourceProxy.getDbType()); + LOGGER.info("Real AT mode executor initialized with {} accounts", BenchmarkConstants.ACCOUNT_COUNT); + } + + private void startMySQLContainer() { + LOGGER.info("Starting MySQL container..."); + mysqlContainer = new MySQLContainer<>(DockerImageName.parse("mysql:8.0")) + .withDatabaseName("benchmark") + .withUsername("test") + .withPassword("test") + .withCommand("--character-set-server=utf8mb4", "--collation-server=utf8mb4_unicode_ci"); + + mysqlContainer.start(); + + LOGGER.info("MySQL container started: {}", mysqlContainer.getJdbcUrl()); + } + + private void createDataSource() { + HikariConfig hikariConfig = new HikariConfig(); + hikariConfig.setJdbcUrl(mysqlContainer.getJdbcUrl()); + hikariConfig.setUsername(mysqlContainer.getUsername()); + hikariConfig.setPassword(mysqlContainer.getPassword()); + hikariConfig.setDriverClassName("com.mysql.cj.jdbc.Driver"); + hikariConfig.setMaximumPoolSize(config.getThreads() * 2); + hikariConfig.setMinimumIdle(config.getThreads()); + hikariConfig.setConnectionTimeout(30000); + hikariConfig.setIdleTimeout(600000); + hikariConfig.setMaxLifetime(1800000); + + rawDataSource = new HikariDataSource(hikariConfig); + LOGGER.info("HikariCP DataSource created"); + } + + private void initDatabase() { + try (Connection conn = rawDataSource.getConnection(); + Statement stmt = conn.createStatement()) { + + // Create accounts table + stmt.execute("CREATE TABLE IF NOT EXISTS accounts (" + + "id BIGINT PRIMARY KEY, " + + "balance INT NOT NULL, " + + "updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4"); + + // Create undo_log table for Seata AT mode (MySQL syntax) + stmt.execute("CREATE TABLE IF NOT EXISTS undo_log (" + + "branch_id BIGINT NOT NULL, " + + "xid VARCHAR(128) NOT NULL, " + + "context VARCHAR(128) NOT NULL, " + + "rollback_info LONGBLOB NOT NULL, " + + "log_status INT NOT NULL, " + + "log_created DATETIME(6) NOT NULL, " + + "log_modified DATETIME(6) NOT NULL, " + + "UNIQUE KEY ux_undo_log (xid, branch_id)" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4"); + + // Insert test data + stmt.execute("TRUNCATE TABLE accounts"); + try (PreparedStatement pstmt = conn.prepareStatement("INSERT INTO accounts (id, balance) VALUES (?, ?)")) { + for (int i = 1; i <= BenchmarkConstants.ACCOUNT_COUNT; i++) { + pstmt.setLong(1, i); + pstmt.setInt(2, BenchmarkConstants.INITIAL_BALANCE); + pstmt.addBatch(); + if (i % 100 == 0) { + pstmt.executeBatch(); + } + } + pstmt.executeBatch(); + } + Review Comment: The last batch of account inserts may not be executed if the total count is not a multiple of 100. After the loop completes, executeBatch() is called at line 146, but this will only execute if there are remaining items from the last incomplete batch. However, looking more closely, the code does call executeBatch() after the loop, so this should work correctly. Still, the logic could be clearer - consider always executing the final batch outside the loop condition. ```suggestion if (i % 100 == 0 || i == BenchmarkConstants.ACCOUNT_COUNT) { pstmt.executeBatch(); } } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
