Copilot commented on code in PR #7865: URL: https://github.com/apache/incubator-seata/pull/7865#discussion_r2676381268
########## test-suite/seata-benchmark-cli/src/main/java/org/apache/seata/benchmark/model/BenchmarkMetrics.java: ########## @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.benchmark.model; + +import org.apache.seata.benchmark.constant.BenchmarkConstants; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Benchmark performance metrics + */ +public class BenchmarkMetrics { + + private final AtomicLong totalCount = new AtomicLong(0); + private final AtomicLong successCount = new AtomicLong(0); + private final AtomicLong failedCount = new AtomicLong(0); + private final List<Long> latencies = + Collections.synchronizedList(new ArrayList<>(BenchmarkConstants.MAX_LATENCY_SAMPLES)); + private final AtomicLong totalSamples = new AtomicLong(0); + private final long startTime = System.currentTimeMillis(); + + private volatile long lastCountSnapshot = 0; + private volatile long lastSnapshotTime = System.currentTimeMillis(); + + private volatile LatencyStats cachedStats = null; + private volatile long lastStatsUpdateTime = 0; + + public void recordSuccess(long latencyMs) { + totalCount.incrementAndGet(); + successCount.incrementAndGet(); + addLatencySample(latencyMs); + } + + public void recordFailure(long latencyMs) { + totalCount.incrementAndGet(); + failedCount.incrementAndGet(); + addLatencySample(latencyMs); + } + + private void addLatencySample(long latencyMs) { + totalSamples.incrementAndGet(); + synchronized (latencies) { + if (latencies.size() < BenchmarkConstants.MAX_LATENCY_SAMPLES) { + latencies.add(latencyMs); + } else { + // Random replacement strategy to maintain bounded samples + int index = ThreadLocalRandom.current().nextInt(BenchmarkConstants.MAX_LATENCY_SAMPLES); + latencies.set(index, latencyMs); Review Comment: In the random replacement strategy, using ThreadLocalRandom for sampling may not provide uniform distribution over the entire dataset. Consider using reservoir sampling algorithm for better statistical properties when the sample size is fixed but the total population is unknown or very large. ```suggestion // Position of this sample in the overall stream (1-based) long n = totalSamples.incrementAndGet(); synchronized (latencies) { int maxSamples = BenchmarkConstants.MAX_LATENCY_SAMPLES; if (latencies.size() < maxSamples) { // Fill the reservoir until it reaches the maximum size latencies.add(latencyMs); } else { // Reservoir sampling: // With probability maxSamples / n, include this sample if (ThreadLocalRandom.current().nextLong(n) < maxSamples) { int index = ThreadLocalRandom.current().nextInt(maxSamples); latencies.set(index, latencyMs); } ``` ########## test-suite/seata-benchmark-cli/src/main/java/org/apache/seata/benchmark/executor/WorkloadGenerator.java: ########## @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.benchmark.executor; + +import com.google.common.util.concurrent.RateLimiter; +import org.apache.seata.benchmark.config.BenchmarkConfig; +import org.apache.seata.benchmark.constant.BenchmarkConstants; +import org.apache.seata.benchmark.model.BenchmarkMetrics; +import org.apache.seata.benchmark.model.TransactionRecord; +import org.apache.seata.common.thread.NamedThreadFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Workload generator with TPS rate limiting + */ +public class WorkloadGenerator { + + private static final Logger LOGGER = LoggerFactory.getLogger(WorkloadGenerator.class); + + private final BenchmarkConfig config; + private final TransactionExecutor executor; + private final BenchmarkMetrics metrics; + private final RateLimiter rateLimiter; + private final ExecutorService executorService; + private final AtomicBoolean running = new AtomicBoolean(false); + private final AtomicBoolean paused = new AtomicBoolean(false); + private final List<TransactionRecord> recentRecords = Collections.synchronizedList(new ArrayList<>()); + + public WorkloadGenerator(BenchmarkConfig config, TransactionExecutor executor, BenchmarkMetrics metrics) { + this.config = config; + this.executor = executor; + this.metrics = metrics; + this.rateLimiter = RateLimiter.create(config.getTargetTps()); + this.executorService = new ThreadPoolExecutor( + config.getThreads(), + config.getThreads(), + 0L, + TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<>(), + new NamedThreadFactory("workload-generator", config.getThreads())); + } + + public void start() { + if (running.compareAndSet(false, true)) { + LOGGER.info( + "Starting workload generator with target TPS: {}, threads: {}", + config.getTargetTps(), + config.getThreads()); + + long startTime = System.currentTimeMillis(); + long endTime = startTime + config.getDuration() * 1000L; + + for (int i = 0; i < config.getThreads(); i++) { + executorService.submit(() -> { + while (running.get() && System.currentTimeMillis() < endTime) { + if (paused.get()) { + try { + Thread.sleep(BenchmarkConstants.PAUSE_CHECK_INTERVAL_MS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + break; + } + continue; + } + + rateLimiter.acquire(); + executeTransaction(); + } + }); + } + + LOGGER.info("Workload generator started"); + } + } + + private void executeTransaction() { + try { + TransactionRecord record = executor.execute(); + + if (record.isSuccess()) { + metrics.recordSuccess(record.getDuration()); + } else { + metrics.recordFailure(record.getDuration()); + } + + addRecentRecord(record); + + } catch (Exception e) { + LOGGER.error("Transaction execution error", e); + metrics.recordFailure(0); + } + } + + private void addRecentRecord(TransactionRecord record) { + synchronized (recentRecords) { + recentRecords.add(0, record); + if (recentRecords.size() > BenchmarkConstants.MAX_RECENT_RECORDS) { + recentRecords.remove(recentRecords.size() - 1); + } + } + } + + public void pause() { + paused.set(true); + LOGGER.info("Workload generator paused"); + } + + public void resume() { + paused.set(false); + LOGGER.info("Workload generator resumed"); + } + + public boolean isPaused() { + return paused.get(); + } + + public void waitForCompletion() { + LOGGER.info("Waiting for benchmark completion..."); + long startTime = System.currentTimeMillis(); + long duration = config.getDuration() * 1000L; + long endTime = startTime + duration; + + try { + while (System.currentTimeMillis() < endTime) { + Thread.sleep(1000); + long elapsed = (System.currentTimeMillis() - startTime) / 1000; + if (elapsed % BenchmarkConstants.PROGRESS_REPORT_INTERVAL_SECONDS == 0 && elapsed > 0) { + System.out.printf( + "[%02d:%02d] %d txns, %.1f txns/sec, %.1f%% success%n", + elapsed / 60, + elapsed % 60, + metrics.getTotalCount(), + metrics.getCurrentTps(), + metrics.getSuccessRate()); + } + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + + stop(); + } Review Comment: The warmup feature is mentioned in multiple places but the actual implementation for excluding warmup period from metrics is not present. The BenchmarkConfig has warmupDuration field and it's displayed in the output, but there's no code in WorkloadGenerator or BenchmarkMetrics to reset metrics after warmup period. This feature appears to be incomplete. ########## test-suite/seata-benchmark-cli/src/main/java/org/apache/seata/benchmark/saga/BenchmarkServiceInvoker.java: ########## @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.benchmark.saga; + +import org.apache.seata.saga.engine.invoker.ServiceInvoker; +import org.apache.seata.saga.statelang.domain.ServiceTaskState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.Method; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Service invoker for benchmark Saga services. + * Invokes registered service methods based on service name and method name. + */ +public class BenchmarkServiceInvoker implements ServiceInvoker { + + private static final Logger LOGGER = LoggerFactory.getLogger(BenchmarkServiceInvoker.class); + + private final Map<String, Object> serviceRegistry = new HashMap<>(); + + public void registerService(String serviceName, Object service) { + serviceRegistry.put(serviceName, service); + LOGGER.debug("Registered service: {}", serviceName); + } + + @Override + public Object invoke(ServiceTaskState serviceTaskState, Object... input) throws Exception { + String serviceName = serviceTaskState.getServiceName(); + String methodName = serviceTaskState.getServiceMethod(); + + Object service = serviceRegistry.get(serviceName); + if (service == null) { + throw new IllegalArgumentException("Service not found: " + serviceName); + } + + // Find the method + Method method = findMethod(service.getClass(), methodName); + if (method == null) { + throw new NoSuchMethodException("Method not found: " + methodName + " in service: " + serviceName); + } + + // Prepare input parameters + Object inputParam = prepareInput(input); + + LOGGER.debug("Invoking service: {}.{}()", serviceName, methodName); + + // Invoke the method + return method.invoke(service, inputParam); + } + + private Method findMethod(Class<?> clazz, String methodName) { + for (Method method : clazz.getMethods()) { + if (method.getName().equals(methodName)) { + return method; + } + } + return null; Review Comment: The method findMethod does not validate parameter types or count. If there are multiple overloaded methods with the same name but different parameters, this will return the first match which may not be the intended method. Consider adding parameter type validation or using more specific method lookup. ```suggestion // Prepare input parameters Object inputParam = prepareInput(input); // Determine parameter types based on prepared input Class<?>[] paramTypes; if (inputParam == null) { paramTypes = new Class<?>[] {Object.class}; } else { paramTypes = new Class<?>[] {inputParam.getClass()}; } // Find the method with matching name and parameter types Method method = findMethod(service.getClass(), methodName, paramTypes); if (method == null) { throw new NoSuchMethodException("Method not found: " + methodName + " in service: " + serviceName); } LOGGER.debug("Invoking service: {}.{}()", serviceName, methodName); // Invoke the method return method.invoke(service, inputParam); } private Method findMethod(Class<?> clazz, String methodName, Class<?>... paramTypes) { // First, try to find an exact public method match by name and parameter types try { return clazz.getMethod(methodName, paramTypes); } catch (NoSuchMethodException e) { // Fallback to manual search for a compatible method (e.g., parameter is an interface/supertype) for (Method method : clazz.getMethods()) { if (!method.getName().equals(methodName)) { continue; } Class<?>[] methodParamTypes = method.getParameterTypes(); if (methodParamTypes.length != paramTypes.length) { continue; } boolean compatible = true; for (int i = 0; i < methodParamTypes.length; i++) { Class<?> expected = methodParamTypes[i]; Class<?> provided = paramTypes[i]; if (provided != null && !expected.isAssignableFrom(provided)) { compatible = false; break; } } if (compatible) { return method; } } return null; } ``` ########## test-suite/seata-benchmark-cli/src/main/java/org/apache/seata/benchmark/executor/SagaModeExecutor.java: ########## @@ -0,0 +1,332 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.benchmark.executor; + +import org.apache.seata.benchmark.config.BenchmarkConfig; +import org.apache.seata.benchmark.model.TransactionRecord; +import org.apache.seata.benchmark.saga.BenchmarkServiceInvoker; +import org.apache.seata.benchmark.saga.InventorySagaService; +import org.apache.seata.benchmark.saga.OrderSagaService; +import org.apache.seata.benchmark.saga.PaymentSagaService; +import org.apache.seata.benchmark.saga.SimpleSpelExpressionFactory; +import org.apache.seata.core.exception.TransactionException; +import org.apache.seata.core.model.GlobalStatus; +import org.apache.seata.saga.engine.StateMachineEngine; +import org.apache.seata.saga.engine.config.AbstractStateMachineConfig; +import org.apache.seata.saga.engine.expression.ExpressionFactoryManager; +import org.apache.seata.saga.engine.impl.ProcessCtrlStateMachineEngine; +import org.apache.seata.saga.statelang.domain.DomainConstants; +import org.apache.seata.saga.statelang.domain.ExecutionStatus; +import org.apache.seata.saga.statelang.domain.StateMachineInstance; +import org.apache.seata.tm.api.GlobalTransaction; +import org.apache.seata.tm.api.GlobalTransactionContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.math.BigDecimal; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicLong; + +import static org.apache.seata.benchmark.constant.BenchmarkConstants.STATUS_COMMITTED; +import static org.apache.seata.benchmark.constant.BenchmarkConstants.STATUS_COMPENSATED; +import static org.apache.seata.benchmark.constant.BenchmarkConstants.STATUS_COMPENSATION_FAILED; +import static org.apache.seata.benchmark.constant.BenchmarkConstants.STATUS_FAILED; +import static org.apache.seata.benchmark.constant.BenchmarkConstants.STATUS_UNKNOWN; + +/** + * Saga mode transaction executor supporting both mock and real modes + * - branches == 0: Mock mode (simplified Saga simulation without state machine) + * - branches > 0: Real mode (state machine engine with compensation support) + */ +public class SagaModeExecutor implements TransactionExecutor { + + private static final Logger LOGGER = LoggerFactory.getLogger(SagaModeExecutor.class); + private static final AtomicLong BUSINESS_KEY_COUNTER = new AtomicLong(0); + + private static final String SIMPLE_SAGA_NAME = "benchmarkSimpleSaga"; + private static final String ORDER_SAGA_NAME = "benchmarkOrderSaga"; + + private final BenchmarkConfig config; + private StateMachineEngine stateMachineEngine; + private BenchmarkStateMachineConfig stateMachineConfig; + + public SagaModeExecutor(BenchmarkConfig config) { + this.config = config; + } + + private boolean isRealMode() { + return config.getBranches() > 0; + } + + @Override + public void init() { + if (isRealMode()) { + initRealMode(); + } else { + LOGGER.info("Saga mode executor initialized (simplified mock mode)"); + LOGGER.info("Note: Full Saga annotation support requires Spring framework integration"); + } + } + + private void initRealMode() { + LOGGER.info("Initializing Real Saga mode executor with state machine engine"); + + try { + // Create and configure state machine config + stateMachineConfig = new BenchmarkStateMachineConfig(); + stateMachineConfig.setRollbackPercentage(config.getRollbackPercentage()); + stateMachineConfig.init(); + + // Create state machine engine + ProcessCtrlStateMachineEngine engine = new ProcessCtrlStateMachineEngine(); + engine.setStateMachineConfig(stateMachineConfig); + this.stateMachineEngine = engine; + + LOGGER.info("Real Saga mode executor initialized"); + LOGGER.info("Available state machines: {}, {}", SIMPLE_SAGA_NAME, ORDER_SAGA_NAME); + + } catch (Exception e) { + throw new RuntimeException("Failed to initialize Saga state machine engine", e); + } + } + + @Override + public TransactionRecord execute() { + if (isRealMode()) { + return executeRealMode(); + } else { + return executeMockMode(); + } + } + + private TransactionRecord executeMockMode() { + GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate(); + long startTime = System.currentTimeMillis(); + String xid = null; + String status = STATUS_UNKNOWN; + int branchCount = config.getBranches(); + boolean success = false; + + try { + tx.begin(30000, "benchmark-saga-tx"); + xid = tx.getXid(); + + // Simulate Saga actions (forward phase) + for (int i = 0; i < branchCount; i++) { + simulateSagaAction(i); + } + + if (shouldRollback()) { + // Simulate compensation (backward phase) + for (int i = branchCount - 1; i >= 0; i--) { + simulateCompensation(i); + } + tx.rollback(); + status = STATUS_COMPENSATED; + } else { + tx.commit(); + status = STATUS_COMMITTED; + success = true; + } + + } catch (TransactionException e) { + LOGGER.debug("Transaction failed: {}", e.getMessage()); + status = STATUS_FAILED; + try { + if (tx.getStatus() != GlobalStatus.Rollbacked && tx.getStatus() != GlobalStatus.RollbackFailed) { + tx.rollback(); + } + } catch (TransactionException rollbackEx) { + LOGGER.debug("Rollback failed: {}", rollbackEx.getMessage()); + } + } + + long duration = System.currentTimeMillis() - startTime; + return new TransactionRecord(xid, status, duration, branchCount, success); + } + + private TransactionRecord executeRealMode() { + long startTime = System.currentTimeMillis(); + String businessKey = "benchmark-" + BUSINESS_KEY_COUNTER.incrementAndGet(); + String status = STATUS_UNKNOWN; + int branchCount = config.getBranches(); + boolean success = false; + + try { + // Choose state machine based on branch count + String stateMachineName = branchCount >= 3 ? ORDER_SAGA_NAME : SIMPLE_SAGA_NAME; + + // Prepare start parameters + Map<String, Object> startParams = createStartParams(); + + // Execute state machine + StateMachineInstance instance = stateMachineEngine.startWithBusinessKey( + stateMachineName, stateMachineConfig.getDefaultTenantId(), businessKey, startParams); + + // Check execution result + ExecutionStatus executionStatus = instance.getStatus(); + ExecutionStatus compensationStatus = instance.getCompensationStatus(); + + if (ExecutionStatus.SU.equals(executionStatus)) { + status = STATUS_COMMITTED; + success = true; + } else if (ExecutionStatus.FA.equals(executionStatus)) { + if (compensationStatus != null) { + if (ExecutionStatus.SU.equals(compensationStatus)) { + status = STATUS_COMPENSATED; + } else { + status = STATUS_COMPENSATION_FAILED; + } + } else { + status = STATUS_FAILED; + } + } else if (ExecutionStatus.UN.equals(executionStatus)) { + status = STATUS_UNKNOWN; + } else { + status = executionStatus != null ? executionStatus.name() : STATUS_UNKNOWN; + } + + } catch (Exception e) { + LOGGER.debug("Saga execution failed: {}", e.getMessage()); + status = STATUS_FAILED; + } + + long duration = System.currentTimeMillis() - startTime; + return new TransactionRecord(businessKey, status, duration, branchCount, success); + } + + @SuppressWarnings("lgtm[java/insecure-randomness]") + private Map<String, Object> createStartParams() { + Map<String, Object> params = new HashMap<>(); + params.put("userId", "user-" + ThreadLocalRandom.current().nextInt(1000)); + params.put("productId", "product-" + ThreadLocalRandom.current().nextInt(100)); + params.put("quantity", ThreadLocalRandom.current().nextInt(10) + 1); + params.put("amount", new BigDecimal(ThreadLocalRandom.current().nextInt(1000) + 100)); + params.put("accountId", "account-" + ThreadLocalRandom.current().nextInt(1000)); + return params; + } + + private void simulateSagaAction(int branchId) { + // Simulated Saga forward action + // In real implementation, this would be a @CompensationBusinessAction annotated method + LOGGER.trace("Executing Saga action for branch {}", branchId); + } + + private void simulateCompensation(int branchId) { + // Simulated Saga compensation action + // In real implementation, this would be the compensationMethod + LOGGER.trace("Executing compensation for branch {}", branchId); + } + + @SuppressWarnings("lgtm[java/insecure-randomness]") + private boolean shouldRollback() { + return ThreadLocalRandom.current().nextInt(100) < config.getRollbackPercentage(); + } + + @Override + public void destroy() { + if (isRealMode()) { + destroyRealMode(); + } + LOGGER.info("Saga mode executor destroyed"); + } + + private void destroyRealMode() { + LOGGER.info("Destroying Real Saga mode resources"); + // StateMachineEngine doesn't have a close method + stateMachineEngine = null; + stateMachineConfig = null; + } + + /** + * Custom StateMachineConfig for benchmark testing. + */ + private static class BenchmarkStateMachineConfig extends AbstractStateMachineConfig { + + private int rollbackPercentage = 0; + + public void setRollbackPercentage(int rollbackPercentage) { + this.rollbackPercentage = rollbackPercentage; + } + + @Override + public void init() throws Exception { + // Load state machine definitions from classpath + try (InputStream simpleSagaStream = getClass() + .getClassLoader() + .getResourceAsStream("seata/saga/statelang/benchmark_simple_saga.json"); + InputStream orderSagaStream = getClass() + .getClassLoader() + .getResourceAsStream("seata/saga/statelang/benchmark_order_saga.json")) { + + if (simpleSagaStream == null || orderSagaStream == null) { + throw new RuntimeException("Failed to load Saga state machine definitions from classpath"); + } + + // Read streams to byte arrays before closing, as super.init() may process them asynchronously + byte[] simpleBytes = readAllBytes(simpleSagaStream); + byte[] orderBytes = readAllBytes(orderSagaStream); + + setStateMachineDefInputStreamArray( + new InputStream[] {new ByteArrayInputStream(simpleBytes), new ByteArrayInputStream(orderBytes) + }); + + // Initialize parent config + super.init(); + + // Register SpEL expression factory for parameter evaluation + ExpressionFactoryManager expressionFactoryManager = getExpressionFactoryManager(); + SimpleSpelExpressionFactory spelExpressionFactory = new SimpleSpelExpressionFactory(); + // Register for default type (when expression doesn't start with $) + expressionFactoryManager.putExpressionFactory( + ExpressionFactoryManager.DEFAULT_EXPRESSION_TYPE, spelExpressionFactory); + // Register for empty type (when expression is like $.xxx where type is empty string) + expressionFactoryManager.putExpressionFactory("", spelExpressionFactory); + + // Register benchmark services with the service invoker manager + BenchmarkServiceInvoker serviceInvoker = new BenchmarkServiceInvoker(); + + // Register services with configured rollback percentage + // Divide rollback percentage by 3 for each service so total probability is approximately correct + int serviceRollbackPct = rollbackPercentage > 0 ? Math.max(1, rollbackPercentage / 3) : 0; Review Comment: The division by 3 for service rollback percentage (line 311) may not accurately reflect the intended overall rollback percentage. If any one of the three services fails, the entire saga will fail. The probability calculation should account for this compound probability. For independent failures, the overall success rate would be approximately (1 - p1) * (1 - p2) * (1 - p3), not a simple division. Consider using 1 - (1 - targetRate)^(1/3) for each service to achieve the target overall rollback rate. ```suggestion // Compute per-service rollback percentage so that the combined rollback rate across 3 services // matches the target rollbackPercentage (assuming independent failures): // perServiceRate = 1 - (1 - targetRate)^(1/3) int serviceRollbackPct; if (rollbackPercentage > 0) { double targetRate = rollbackPercentage / 100.0; double perServiceRate = 1 - Math.pow(1 - targetRate, 1.0 / 3.0); serviceRollbackPct = Math.max(1, (int) Math.round(perServiceRate * 100)); } else { serviceRollbackPct = 0; } ``` ########## test-suite/seata-benchmark-cli/src/main/java/org/apache/seata/benchmark/config/BenchmarkConfig.java: ########## @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.benchmark.config; + +import org.apache.seata.core.model.BranchType; + +/** + * Benchmark configuration + */ +public class BenchmarkConfig { + + private String server = "127.0.0.1:8091"; + private BranchType mode = BranchType.AT; + private int targetTps = 100; + private int threads = 10; + private int duration = 60; + private int warmupDuration = 0; + private String applicationId = "benchmark-app"; + private String txServiceGroup = "default_tx_group"; + private int rollbackPercentage = 2; + private int branches = 0; + + public BranchType getMode() { + return mode; + } + + public void setMode(BranchType mode) { + this.mode = mode; + } + + public void setMode(String mode) { + this.mode = BranchType.get(mode); Review Comment: Missing validation for the 'mode' parameter conversion. If an invalid mode string is provided via YAML configuration, BranchType.get() may return null or throw an exception. The setMode(String) method should validate the input and throw a meaningful exception for invalid values. ```suggestion if (mode == null || mode.trim().isEmpty()) { throw new IllegalArgumentException("Mode must not be null or empty."); } BranchType branchType; try { branchType = BranchType.get(mode); } catch (IllegalArgumentException e) { throw new IllegalArgumentException("Invalid mode value: " + mode, e); } if (branchType == null) { throw new IllegalArgumentException("Unsupported mode value: " + mode); } this.mode = branchType; ``` ########## test-suite/seata-benchmark-cli/src/main/java/org/apache/seata/benchmark/executor/WorkloadGenerator.java: ########## @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.benchmark.executor; + +import com.google.common.util.concurrent.RateLimiter; +import org.apache.seata.benchmark.config.BenchmarkConfig; +import org.apache.seata.benchmark.constant.BenchmarkConstants; +import org.apache.seata.benchmark.model.BenchmarkMetrics; +import org.apache.seata.benchmark.model.TransactionRecord; +import org.apache.seata.common.thread.NamedThreadFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Workload generator with TPS rate limiting + */ +public class WorkloadGenerator { + + private static final Logger LOGGER = LoggerFactory.getLogger(WorkloadGenerator.class); + + private final BenchmarkConfig config; + private final TransactionExecutor executor; + private final BenchmarkMetrics metrics; + private final RateLimiter rateLimiter; + private final ExecutorService executorService; + private final AtomicBoolean running = new AtomicBoolean(false); + private final AtomicBoolean paused = new AtomicBoolean(false); + private final List<TransactionRecord> recentRecords = Collections.synchronizedList(new ArrayList<>()); + + public WorkloadGenerator(BenchmarkConfig config, TransactionExecutor executor, BenchmarkMetrics metrics) { + this.config = config; + this.executor = executor; + this.metrics = metrics; + this.rateLimiter = RateLimiter.create(config.getTargetTps()); + this.executorService = new ThreadPoolExecutor( + config.getThreads(), + config.getThreads(), + 0L, + TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<>(), + new NamedThreadFactory("workload-generator", config.getThreads())); + } + + public void start() { + if (running.compareAndSet(false, true)) { + LOGGER.info( + "Starting workload generator with target TPS: {}, threads: {}", + config.getTargetTps(), + config.getThreads()); + + long startTime = System.currentTimeMillis(); + long endTime = startTime + config.getDuration() * 1000L; + + for (int i = 0; i < config.getThreads(); i++) { + executorService.submit(() -> { + while (running.get() && System.currentTimeMillis() < endTime) { + if (paused.get()) { + try { + Thread.sleep(BenchmarkConstants.PAUSE_CHECK_INTERVAL_MS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + break; + } + continue; + } + + rateLimiter.acquire(); + executeTransaction(); + } + }); + } + + LOGGER.info("Workload generator started"); + } + } + + private void executeTransaction() { + try { + TransactionRecord record = executor.execute(); + + if (record.isSuccess()) { + metrics.recordSuccess(record.getDuration()); + } else { + metrics.recordFailure(record.getDuration()); + } + + addRecentRecord(record); + + } catch (Exception e) { + LOGGER.error("Transaction execution error", e); + metrics.recordFailure(0); + } + } + + private void addRecentRecord(TransactionRecord record) { + synchronized (recentRecords) { + recentRecords.add(0, record); + if (recentRecords.size() > BenchmarkConstants.MAX_RECENT_RECORDS) { + recentRecords.remove(recentRecords.size() - 1); + } + } + } + + public void pause() { + paused.set(true); + LOGGER.info("Workload generator paused"); + } + + public void resume() { + paused.set(false); + LOGGER.info("Workload generator resumed"); + } + + public boolean isPaused() { + return paused.get(); + } + + public void waitForCompletion() { + LOGGER.info("Waiting for benchmark completion..."); + long startTime = System.currentTimeMillis(); + long duration = config.getDuration() * 1000L; + long endTime = startTime + duration; + + try { + while (System.currentTimeMillis() < endTime) { + Thread.sleep(1000); + long elapsed = (System.currentTimeMillis() - startTime) / 1000; + if (elapsed % BenchmarkConstants.PROGRESS_REPORT_INTERVAL_SECONDS == 0 && elapsed > 0) { + System.out.printf( + "[%02d:%02d] %d txns, %.1f txns/sec, %.1f%% success%n", + elapsed / 60, + elapsed % 60, + metrics.getTotalCount(), + metrics.getCurrentTps(), + metrics.getSuccessRate()); + } Review Comment: The progress reporting timing may be inaccurate due to sleep drift. The condition 'elapsed % PROGRESS_REPORT_INTERVAL_SECONDS == 0' only checks if elapsed time is a multiple of 10, but with 1-second sleep intervals, this might miss the exact moment or report twice if processing time causes drift. Consider tracking the last report time explicitly to ensure reports happen exactly every 10 seconds. ########## test-suite/seata-benchmark-cli/README.md: ########## @@ -0,0 +1,434 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +# Seata Benchmark CLI Tool + +A command-line benchmark tool for stress testing Seata transaction modes. + +## Features + +- Support for **AT**, **TCC**, and **SAGA** transaction modes +- **Dual execution modes**: + - **Empty mode** (`--branches 0`): Pure Seata protocol overhead testing + - **Real mode** (`--branches N`): Actual distributed transaction execution +- **Configurable TPS** (Transactions Per Second) control +- **Multi-threaded** workload generation +- **Fault injection** with configurable rollback percentage +- **Window-based progress reporting** (every 10 seconds) +- Performance metrics collection (latency percentiles, success rate, TPS) +- **CSV export** for post-analysis +- **Warmup support** (exclude initial ramp-up from final statistics) +- **YAML configuration file** support + +## Prerequisites + +- JDK 8 or higher +- Maven 3.6+ +- Running Seata Server +- Docker (required for real mode with Testcontainers) + +## Build + +```bash +cd test-suite/seata-benchmark-cli +../../mvnw clean package +``` + +The executable JAR will be created at `target/seata-benchmark-cli.jar` + +## Usage + +### Basic Usage + +```bash +# AT mode benchmark (empty transaction) +java -jar seata-benchmark-cli.jar \ + --server 127.0.0.1:8091 \ + --mode AT \ + --tps 100 \ + --duration 60 + +# TCC mode benchmark +java -jar seata-benchmark-cli.jar \ + --server 127.0.0.1:8091 \ + --mode TCC \ + --tps 200 \ + --threads 20 \ + --duration 120 + +# SAGA mode benchmark (empty transaction) +java -jar seata-benchmark-cli.jar \ + --server 127.0.0.1:8091 \ + --mode SAGA \ + --tps 100 \ + --duration 60 +``` + +### Real Mode (with actual database operations) + +```bash +# AT mode with real MySQL transactions (via Testcontainers) +java -jar seata-benchmark-cli.jar \ + --server 127.0.0.1:8091 \ + --mode AT \ + --tps 100 \ + --duration 60 \ + --branches 3 + +# SAGA mode with state machine engine +java -jar seata-benchmark-cli.jar \ + --server 127.0.0.1:8091 \ + --mode SAGA \ + --tps 100 \ + --duration 60 \ + --branches 3 \ + --rollback-percentage 5 +``` + +### Advanced Options + +```bash +java -jar seata-benchmark-cli.jar \ + --server 127.0.0.1:8091 \ + --mode AT \ + --tps 500 \ + --threads 50 \ + --duration 300 \ + --warmup-duration 30 \ + --rollback-percentage 2 \ + --branches 3 \ + --export-csv results.csv \ + --application-id my-benchmark-app \ + --tx-service-group my_tx_group +``` + +### All CLI Options + +``` +Usage: seata-benchmark [-hV] [--application-id=<applicationId>] + [-d=<duration>] [--export-csv=<exportCsv>] + [-m=<mode>] [-s=<server>] [-t=<targetTps>] + [--threads=<threads>] [--tx-service-group=<txServiceGroup>] + [--warmup-duration=<warmupDuration>] + [--rollback-percentage=<rollbackPercentage>] + [--branches=<branches>] + +Options: + -s, --server=<server> Seata Server address (host:port) + -m, --mode=<mode> Transaction mode: AT, TCC, or SAGA + -t, --tps=<targetTps> Target TPS (default: 100) + --threads=<threads> Concurrent threads (default: 10) + -d, --duration=<duration> Duration in seconds (default: 60) + --warmup-duration=<warmupDuration> + Warmup duration in seconds (default: 0) + --rollback-percentage=<rollbackPercentage> + Rollback percentage for fault injection (0-100, default: 2) + --branches=<branches> Number of branch transactions + 0 = empty mode (protocol overhead only) + >=1 = real mode (actual execution) + --export-csv=<exportCsv> Export metrics to CSV file + --application-id=<applicationId> + Seata application ID (default: benchmark-app) + --tx-service-group=<txServiceGroup> + Seata tx service group (default: default_tx_group) + -h, --help Show this help message + -V, --version Print version information +``` + +## Configuration File + +The benchmark tool supports YAML configuration files. Configuration priority (highest to lowest): + +1. CLI arguments +2. Environment variable `BENCHMARK_CONFIG_FILE` +3. System property `benchmark.config.file` +4. Default classpath `benchmark.yaml` + +### Example Configuration + +```yaml +# benchmark.yaml +server: 127.0.0.1:8091 +mode: AT +targetTps: 100 +threads: 10 +duration: 60 +warmupDuration: 10 +rollbackPercentage: 2 +branches: 0 +applicationId: benchmark-app +txServiceGroup: default_tx_group +``` + +## Output + +### Console Progress + +During execution, the tool displays progress every 10 seconds: + +``` +Starting benchmark... + +[00:10] 1000 txns, 100.2 txns/sec, 99.0% success +[00:20] 2000 txns, 100.1 txns/sec, 99.2% success +[00:30] 3000 txns, 99.8 txns/sec, 99.5% success +... +``` + +### Final Report + +When the benchmark completes, a final report is displayed: + +``` +=================================================== + Seata Benchmark Final Report +=================================================== +Total Transactions: 6,000 +Success Count: 5,940 +Failed Count: 60 +Success Rate: 99.00% +Average TPS: 100.2 +Elapsed Time: 60 seconds + +Latency Statistics: + P50: 12 ms + P95: 45 ms + P99: 89 ms + Max: 230 ms +=================================================== +``` + +### CSV Export + +Use `--export-csv` to export metrics: + +```bash +java -jar seata-benchmark-cli.jar \ + --server 127.0.0.1:8091 \ + --mode AT \ + --tps 100 \ + --duration 60 \ + --export-csv results.csv +``` + +Output format: + +```csv +Metric,Value +Total Transactions,6000 +Success Count,5940 +Failed Count,60 +Success Rate (%),99.00 +Average TPS,100.2 +Elapsed Time (s),60 +Latency P50 (ms),12 +Latency P95 (ms),45 +Latency P99 (ms),89 +Latency Max (ms),230 +Export Timestamp,2025-12-01T10:30:45Z Review Comment: The timestamp format in the README example shows '2025-12-01T10:30:45Z' but the actual code in MetricsCollector uses 'yyyy-MM-dd HH:mm:ss' format without timezone information. This documentation inconsistency should be fixed to match the actual implementation. ```suggestion Export Timestamp,2025-12-01 10:30:45 ``` ########## test-suite/seata-benchmark-cli/src/main/java/org/apache/seata/benchmark/executor/SagaModeExecutor.java: ########## @@ -0,0 +1,332 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.benchmark.executor; + +import org.apache.seata.benchmark.config.BenchmarkConfig; +import org.apache.seata.benchmark.model.TransactionRecord; +import org.apache.seata.benchmark.saga.BenchmarkServiceInvoker; +import org.apache.seata.benchmark.saga.InventorySagaService; +import org.apache.seata.benchmark.saga.OrderSagaService; +import org.apache.seata.benchmark.saga.PaymentSagaService; +import org.apache.seata.benchmark.saga.SimpleSpelExpressionFactory; +import org.apache.seata.core.exception.TransactionException; +import org.apache.seata.core.model.GlobalStatus; +import org.apache.seata.saga.engine.StateMachineEngine; +import org.apache.seata.saga.engine.config.AbstractStateMachineConfig; +import org.apache.seata.saga.engine.expression.ExpressionFactoryManager; +import org.apache.seata.saga.engine.impl.ProcessCtrlStateMachineEngine; +import org.apache.seata.saga.statelang.domain.DomainConstants; +import org.apache.seata.saga.statelang.domain.ExecutionStatus; +import org.apache.seata.saga.statelang.domain.StateMachineInstance; +import org.apache.seata.tm.api.GlobalTransaction; +import org.apache.seata.tm.api.GlobalTransactionContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.math.BigDecimal; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicLong; + +import static org.apache.seata.benchmark.constant.BenchmarkConstants.STATUS_COMMITTED; +import static org.apache.seata.benchmark.constant.BenchmarkConstants.STATUS_COMPENSATED; +import static org.apache.seata.benchmark.constant.BenchmarkConstants.STATUS_COMPENSATION_FAILED; +import static org.apache.seata.benchmark.constant.BenchmarkConstants.STATUS_FAILED; +import static org.apache.seata.benchmark.constant.BenchmarkConstants.STATUS_UNKNOWN; + +/** + * Saga mode transaction executor supporting both mock and real modes + * - branches == 0: Mock mode (simplified Saga simulation without state machine) + * - branches > 0: Real mode (state machine engine with compensation support) + */ +public class SagaModeExecutor implements TransactionExecutor { + + private static final Logger LOGGER = LoggerFactory.getLogger(SagaModeExecutor.class); + private static final AtomicLong BUSINESS_KEY_COUNTER = new AtomicLong(0); + + private static final String SIMPLE_SAGA_NAME = "benchmarkSimpleSaga"; + private static final String ORDER_SAGA_NAME = "benchmarkOrderSaga"; + + private final BenchmarkConfig config; + private StateMachineEngine stateMachineEngine; + private BenchmarkStateMachineConfig stateMachineConfig; + + public SagaModeExecutor(BenchmarkConfig config) { + this.config = config; + } + + private boolean isRealMode() { + return config.getBranches() > 0; + } + + @Override + public void init() { + if (isRealMode()) { + initRealMode(); + } else { + LOGGER.info("Saga mode executor initialized (simplified mock mode)"); + LOGGER.info("Note: Full Saga annotation support requires Spring framework integration"); + } + } + + private void initRealMode() { + LOGGER.info("Initializing Real Saga mode executor with state machine engine"); + + try { + // Create and configure state machine config + stateMachineConfig = new BenchmarkStateMachineConfig(); + stateMachineConfig.setRollbackPercentage(config.getRollbackPercentage()); + stateMachineConfig.init(); + + // Create state machine engine + ProcessCtrlStateMachineEngine engine = new ProcessCtrlStateMachineEngine(); + engine.setStateMachineConfig(stateMachineConfig); + this.stateMachineEngine = engine; + + LOGGER.info("Real Saga mode executor initialized"); + LOGGER.info("Available state machines: {}, {}", SIMPLE_SAGA_NAME, ORDER_SAGA_NAME); + + } catch (Exception e) { + throw new RuntimeException("Failed to initialize Saga state machine engine", e); + } + } + + @Override + public TransactionRecord execute() { + if (isRealMode()) { + return executeRealMode(); + } else { + return executeMockMode(); + } + } + + private TransactionRecord executeMockMode() { + GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate(); + long startTime = System.currentTimeMillis(); + String xid = null; + String status = STATUS_UNKNOWN; + int branchCount = config.getBranches(); + boolean success = false; + + try { + tx.begin(30000, "benchmark-saga-tx"); + xid = tx.getXid(); + + // Simulate Saga actions (forward phase) + for (int i = 0; i < branchCount; i++) { + simulateSagaAction(i); + } + + if (shouldRollback()) { + // Simulate compensation (backward phase) + for (int i = branchCount - 1; i >= 0; i--) { + simulateCompensation(i); + } + tx.rollback(); + status = STATUS_COMPENSATED; + } else { + tx.commit(); + status = STATUS_COMMITTED; + success = true; + } + + } catch (TransactionException e) { + LOGGER.debug("Transaction failed: {}", e.getMessage()); + status = STATUS_FAILED; + try { + if (tx.getStatus() != GlobalStatus.Rollbacked && tx.getStatus() != GlobalStatus.RollbackFailed) { + tx.rollback(); + } + } catch (TransactionException rollbackEx) { + LOGGER.debug("Rollback failed: {}", rollbackEx.getMessage()); + } + } + + long duration = System.currentTimeMillis() - startTime; + return new TransactionRecord(xid, status, duration, branchCount, success); + } + + private TransactionRecord executeRealMode() { + long startTime = System.currentTimeMillis(); + String businessKey = "benchmark-" + BUSINESS_KEY_COUNTER.incrementAndGet(); + String status = STATUS_UNKNOWN; + int branchCount = config.getBranches(); + boolean success = false; + + try { + // Choose state machine based on branch count + String stateMachineName = branchCount >= 3 ? ORDER_SAGA_NAME : SIMPLE_SAGA_NAME; + + // Prepare start parameters + Map<String, Object> startParams = createStartParams(); + + // Execute state machine + StateMachineInstance instance = stateMachineEngine.startWithBusinessKey( + stateMachineName, stateMachineConfig.getDefaultTenantId(), businessKey, startParams); + + // Check execution result + ExecutionStatus executionStatus = instance.getStatus(); + ExecutionStatus compensationStatus = instance.getCompensationStatus(); + + if (ExecutionStatus.SU.equals(executionStatus)) { + status = STATUS_COMMITTED; + success = true; + } else if (ExecutionStatus.FA.equals(executionStatus)) { + if (compensationStatus != null) { + if (ExecutionStatus.SU.equals(compensationStatus)) { + status = STATUS_COMPENSATED; + } else { + status = STATUS_COMPENSATION_FAILED; + } + } else { + status = STATUS_FAILED; + } + } else if (ExecutionStatus.UN.equals(executionStatus)) { + status = STATUS_UNKNOWN; + } else { + status = executionStatus != null ? executionStatus.name() : STATUS_UNKNOWN; + } + + } catch (Exception e) { + LOGGER.debug("Saga execution failed: {}", e.getMessage()); + status = STATUS_FAILED; + } + + long duration = System.currentTimeMillis() - startTime; + return new TransactionRecord(businessKey, status, duration, branchCount, success); + } + + @SuppressWarnings("lgtm[java/insecure-randomness]") Review Comment: The suppress annotation with LGTM tag references a deprecated static analysis tool (LGTM.com). Consider using modern alternatives like CodeQL or removing these annotations. ########## test-suite/seata-benchmark-cli/src/main/java/org/apache/seata/benchmark/config/BenchmarkConfigLoader.java: ########## @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.benchmark.config; + +import org.apache.seata.common.util.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.yaml.snakeyaml.LoaderOptions; +import org.yaml.snakeyaml.Yaml; +import org.yaml.snakeyaml.constructor.SafeConstructor; + +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; + +/** + * Configuration loader for benchmark + */ +public class BenchmarkConfigLoader { + + private static final Logger LOGGER = LoggerFactory.getLogger(BenchmarkConfigLoader.class); + private static final String DEFAULT_CONFIG_FILE = "benchmark.yaml"; + private static final String ENV_CONFIG_FILE = "BENCHMARK_CONFIG_FILE"; + private static final String SYS_CONFIG_FILE = "benchmark.config.file"; + + public static BenchmarkConfig load(String configFile) throws IOException { + BenchmarkConfig config; + String configSource; + + if (configFile != null && !configFile.isEmpty()) { + config = loadFromFile(configFile); + configSource = "CLI argument: " + configFile; + } else { + String envConfig = System.getenv(ENV_CONFIG_FILE); + if (envConfig != null && !envConfig.isEmpty()) { + config = loadFromFile(envConfig); + configSource = "Environment variable " + ENV_CONFIG_FILE + ": " + envConfig; + } else { + String sysConfig = System.getProperty(SYS_CONFIG_FILE); + if (sysConfig != null && !sysConfig.isEmpty()) { + config = loadFromFile(sysConfig); + configSource = "System property " + SYS_CONFIG_FILE + ": " + sysConfig; + } else { + config = loadDefault(); + configSource = "Default classpath: " + DEFAULT_CONFIG_FILE; + } + } + } + + LOGGER.info("Loading configuration from: {}", configSource); + config.validate(); + return config; + } + + private static BenchmarkConfig loadFromFile(String filePath) throws IOException { + try (FileInputStream fis = new FileInputStream(filePath)) { + return loadFromStream(fis); + } + } + + public static BenchmarkConfig loadDefault() throws IOException { + try (InputStream is = BenchmarkConfigLoader.class.getClassLoader().getResourceAsStream(DEFAULT_CONFIG_FILE)) { + if (is == null) { + LOGGER.warn("Default configuration file not found, using built-in defaults"); + return new BenchmarkConfig(); + } + return loadFromStream(is); + } + } + + private static BenchmarkConfig loadFromStream(InputStream inputStream) { + LoaderOptions loaderOptions = new LoaderOptions(); Review Comment: The SafeConstructor is used but LoaderOptions is created without any specific configuration. For security, consider explicitly setting code point limits or other security-related options in LoaderOptions to prevent potential YAML bomb attacks or other malicious YAML constructs. ```suggestion LoaderOptions loaderOptions = new LoaderOptions(); // Set conservative limits to avoid YAML bombs and other malicious payloads loaderOptions.setCodePointLimit(1_000_000); loaderOptions.setMaxAliasesForCollections(50); ``` ########## test-suite/seata-benchmark-cli/src/main/java/org/apache/seata/benchmark/model/BenchmarkMetrics.java: ########## @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.benchmark.model; + +import org.apache.seata.benchmark.constant.BenchmarkConstants; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Benchmark performance metrics + */ +public class BenchmarkMetrics { + + private final AtomicLong totalCount = new AtomicLong(0); + private final AtomicLong successCount = new AtomicLong(0); + private final AtomicLong failedCount = new AtomicLong(0); + private final List<Long> latencies = + Collections.synchronizedList(new ArrayList<>(BenchmarkConstants.MAX_LATENCY_SAMPLES)); + private final AtomicLong totalSamples = new AtomicLong(0); + private final long startTime = System.currentTimeMillis(); + + private volatile long lastCountSnapshot = 0; + private volatile long lastSnapshotTime = System.currentTimeMillis(); + + private volatile LatencyStats cachedStats = null; + private volatile long lastStatsUpdateTime = 0; + + public void recordSuccess(long latencyMs) { + totalCount.incrementAndGet(); + successCount.incrementAndGet(); + addLatencySample(latencyMs); + } + + public void recordFailure(long latencyMs) { + totalCount.incrementAndGet(); + failedCount.incrementAndGet(); + addLatencySample(latencyMs); + } + + private void addLatencySample(long latencyMs) { + totalSamples.incrementAndGet(); + synchronized (latencies) { + if (latencies.size() < BenchmarkConstants.MAX_LATENCY_SAMPLES) { + latencies.add(latencyMs); + } else { + // Random replacement strategy to maintain bounded samples + int index = ThreadLocalRandom.current().nextInt(BenchmarkConstants.MAX_LATENCY_SAMPLES); + latencies.set(index, latencyMs); + } + } + } + + public long getTotalCount() { + return totalCount.get(); + } + + public long getSuccessCount() { + return successCount.get(); + } + + public long getFailedCount() { + return failedCount.get(); + } + + public double getSuccessRate() { + long total = totalCount.get(); + if (total == 0) { + return 0.0; + } + return (double) successCount.get() / total * 100; + } + + public double getAverageTps() { + long elapsed = System.currentTimeMillis() - startTime; + if (elapsed == 0) { + return 0.0; + } + return (double) totalCount.get() / elapsed * 1000; + } + + public synchronized double getCurrentTps() { + long currentTime = System.currentTimeMillis(); + long currentCount = totalCount.get(); + long elapsed = currentTime - lastSnapshotTime; + + if (elapsed == 0) { + return 0.0; + } + + double tps = (double) (currentCount - lastCountSnapshot) / elapsed * 1000; + lastSnapshotTime = currentTime; + lastCountSnapshot = currentCount; + return tps; + } + + public long getElapsedTimeSeconds() { + return (System.currentTimeMillis() - startTime) / 1000; + } + + public LatencyStats getLatencyStats() { + long now = System.currentTimeMillis(); + if (cachedStats != null && (now - lastStatsUpdateTime) < BenchmarkConstants.LATENCY_STATS_CACHE_MS) { + return cachedStats; + } + + synchronized (this) { + // Double-check + if (cachedStats != null && (now - lastStatsUpdateTime) < BenchmarkConstants.LATENCY_STATS_CACHE_MS) { + return cachedStats; + } + + if (latencies.isEmpty()) { + cachedStats = new LatencyStats(0, 0, 0, 0); + } else { + List<Long> sortedLatencies = new ArrayList<>(latencies); + Collections.sort(sortedLatencies); + + int size = sortedLatencies.size(); + long p50 = sortedLatencies.get(Math.max(0, (int) Math.ceil(size * 0.5) - 1)); + long p95 = sortedLatencies.get(Math.max(0, (int) Math.ceil(size * 0.95) - 1)); + long p99 = sortedLatencies.get(Math.max(0, (int) Math.ceil(size * 0.99) - 1)); Review Comment: The percentile calculation uses Math.ceil which may result in incorrect percentile values. For percentiles, the standard approach is to use Math.floor or proper interpolation. For example, for P50 with 100 elements, you'd want element at index 49 (0-indexed) or 50 (1-indexed), but Math.ceil(100 * 0.5) - 1 = 49, which is correct by chance. However, for P95 with 100 elements, Math.ceil(100 * 0.95) - 1 = 94, when it should be 95. Consider using the proper formula: (int)((size - 1) * percentile) for 0-indexed arrays. ```suggestion int idx50 = (int) ((size - 1) * 0.5); int idx95 = (int) ((size - 1) * 0.95); int idx99 = (int) ((size - 1) * 0.99); long p50 = sortedLatencies.get(idx50); long p95 = sortedLatencies.get(idx95); long p99 = sortedLatencies.get(idx99); ``` ########## test-suite/seata-benchmark-cli/src/main/java/org/apache/seata/benchmark/model/BenchmarkMetrics.java: ########## @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.benchmark.model; + +import org.apache.seata.benchmark.constant.BenchmarkConstants; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Benchmark performance metrics + */ +public class BenchmarkMetrics { + + private final AtomicLong totalCount = new AtomicLong(0); + private final AtomicLong successCount = new AtomicLong(0); + private final AtomicLong failedCount = new AtomicLong(0); + private final List<Long> latencies = + Collections.synchronizedList(new ArrayList<>(BenchmarkConstants.MAX_LATENCY_SAMPLES)); + private final AtomicLong totalSamples = new AtomicLong(0); + private final long startTime = System.currentTimeMillis(); + + private volatile long lastCountSnapshot = 0; + private volatile long lastSnapshotTime = System.currentTimeMillis(); + + private volatile LatencyStats cachedStats = null; + private volatile long lastStatsUpdateTime = 0; + + public void recordSuccess(long latencyMs) { + totalCount.incrementAndGet(); + successCount.incrementAndGet(); + addLatencySample(latencyMs); + } + + public void recordFailure(long latencyMs) { + totalCount.incrementAndGet(); + failedCount.incrementAndGet(); + addLatencySample(latencyMs); + } + + private void addLatencySample(long latencyMs) { + totalSamples.incrementAndGet(); + synchronized (latencies) { + if (latencies.size() < BenchmarkConstants.MAX_LATENCY_SAMPLES) { + latencies.add(latencyMs); + } else { + // Random replacement strategy to maintain bounded samples + int index = ThreadLocalRandom.current().nextInt(BenchmarkConstants.MAX_LATENCY_SAMPLES); + latencies.set(index, latencyMs); + } + } + } + + public long getTotalCount() { + return totalCount.get(); + } + + public long getSuccessCount() { + return successCount.get(); + } + + public long getFailedCount() { + return failedCount.get(); + } + + public double getSuccessRate() { + long total = totalCount.get(); + if (total == 0) { + return 0.0; + } + return (double) successCount.get() / total * 100; + } + + public double getAverageTps() { + long elapsed = System.currentTimeMillis() - startTime; + if (elapsed == 0) { + return 0.0; + } + return (double) totalCount.get() / elapsed * 1000; + } + + public synchronized double getCurrentTps() { + long currentTime = System.currentTimeMillis(); + long currentCount = totalCount.get(); + long elapsed = currentTime - lastSnapshotTime; + + if (elapsed == 0) { + return 0.0; + } + + double tps = (double) (currentCount - lastCountSnapshot) / elapsed * 1000; + lastSnapshotTime = currentTime; + lastCountSnapshot = currentCount; + return tps; + } + + public long getElapsedTimeSeconds() { + return (System.currentTimeMillis() - startTime) / 1000; + } + + public LatencyStats getLatencyStats() { + long now = System.currentTimeMillis(); + if (cachedStats != null && (now - lastStatsUpdateTime) < BenchmarkConstants.LATENCY_STATS_CACHE_MS) { + return cachedStats; + } + + synchronized (this) { + // Double-check + if (cachedStats != null && (now - lastStatsUpdateTime) < BenchmarkConstants.LATENCY_STATS_CACHE_MS) { + return cachedStats; + } + + if (latencies.isEmpty()) { + cachedStats = new LatencyStats(0, 0, 0, 0); + } else { + List<Long> sortedLatencies = new ArrayList<>(latencies); + Collections.sort(sortedLatencies); + + int size = sortedLatencies.size(); + long p50 = sortedLatencies.get(Math.max(0, (int) Math.ceil(size * 0.5) - 1)); + long p95 = sortedLatencies.get(Math.max(0, (int) Math.ceil(size * 0.95) - 1)); + long p99 = sortedLatencies.get(Math.max(0, (int) Math.ceil(size * 0.99) - 1)); + long max = sortedLatencies.get(size - 1); + + cachedStats = new LatencyStats(p50, p95, p99, max); + } + + lastStatsUpdateTime = now; + return cachedStats; + } + } Review Comment: The double-checked locking pattern here doesn't require explicit 'synchronized' on the method since the cached stats are volatile and the check is only for caching optimization. The synchronized block inside is sufficient for thread safety when updating the cache. However, the current implementation is correct but could be more performant by removing the outer 'synchronized' keyword from the method signature and keeping only the inner synchronized block. ########## test-suite/seata-benchmark-cli/src/main/java/org/apache/seata/benchmark/executor/ATModeExecutor.java: ########## @@ -0,0 +1,249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.benchmark.executor; + +import com.zaxxer.hikari.HikariConfig; +import com.zaxxer.hikari.HikariDataSource; +import org.apache.seata.benchmark.config.BenchmarkConfig; +import org.apache.seata.benchmark.constant.BenchmarkConstants; +import org.apache.seata.rm.datasource.DataSourceProxy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.MySQLContainer; +import org.testcontainers.utility.DockerImageName; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.concurrent.ThreadLocalRandom; + +/** + * AT mode transaction executor supporting both empty and real transaction modes + * - branches == 0: Empty transaction mode (pure Seata protocol overhead testing) + * - branches > 0: Real mode with MySQL database (via Testcontainers) + */ +public class ATModeExecutor extends AbstractTransactionExecutor { + + private static final Logger LOGGER = LoggerFactory.getLogger(ATModeExecutor.class); + + private MySQLContainer<?> mysqlContainer; + private HikariDataSource rawDataSource; + private DataSourceProxy dataSourceProxy; + + public ATModeExecutor(BenchmarkConfig config) { + super(config); + } + + private boolean isRealMode() { + return config.getBranches() > 0; + } + + @Override + public void init() { + if (isRealMode()) { + LOGGER.info("Initializing AT mode executor (MySQL via Testcontainers)"); + initRealMode(); + } else { + LOGGER.info("AT mode executor initialized (empty transaction mode)"); + } + } + + private void initRealMode() { + // Start MySQL container + startMySQLContainer(); + + // Create HikariCP connection pool + createDataSource(); + + // Initialize database schema and data + initDatabase(); + + // Wrap with Seata DataSourceProxy for AT mode + dataSourceProxy = new DataSourceProxy(rawDataSource); + + LOGGER.info("DataSourceProxy initialized, dbType: {}", dataSourceProxy.getDbType()); + LOGGER.info("Real AT mode executor initialized with {} accounts", BenchmarkConstants.ACCOUNT_COUNT); + } + + private void startMySQLContainer() { + LOGGER.info("Starting MySQL container..."); + mysqlContainer = new MySQLContainer<>(DockerImageName.parse("mysql:8.0")) + .withDatabaseName("benchmark") + .withUsername("test") + .withPassword("test") + .withCommand("--character-set-server=utf8mb4", "--collation-server=utf8mb4_unicode_ci"); + + mysqlContainer.start(); + + LOGGER.info("MySQL container started: {}", mysqlContainer.getJdbcUrl()); + } + + private void createDataSource() { + HikariConfig hikariConfig = new HikariConfig(); + hikariConfig.setJdbcUrl(mysqlContainer.getJdbcUrl()); + hikariConfig.setUsername(mysqlContainer.getUsername()); + hikariConfig.setPassword(mysqlContainer.getPassword()); + hikariConfig.setDriverClassName("com.mysql.cj.jdbc.Driver"); + hikariConfig.setMaximumPoolSize(config.getThreads() * 2); + hikariConfig.setMinimumIdle(config.getThreads()); + hikariConfig.setConnectionTimeout(30000); + hikariConfig.setIdleTimeout(600000); + hikariConfig.setMaxLifetime(1800000); + + rawDataSource = new HikariDataSource(hikariConfig); + LOGGER.info("HikariCP DataSource created"); + } + + private void initDatabase() { + try (Connection conn = rawDataSource.getConnection(); + Statement stmt = conn.createStatement()) { + + // Create accounts table + stmt.execute("CREATE TABLE IF NOT EXISTS accounts (" + + "id BIGINT PRIMARY KEY, " + + "balance INT NOT NULL, " + + "updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4"); + + // Create undo_log table for Seata AT mode (MySQL syntax) + stmt.execute("CREATE TABLE IF NOT EXISTS undo_log (" + + "branch_id BIGINT NOT NULL, " + + "xid VARCHAR(128) NOT NULL, " + + "context VARCHAR(128) NOT NULL, " + + "rollback_info LONGBLOB NOT NULL, " + + "log_status INT NOT NULL, " + + "log_created DATETIME(6) NOT NULL, " + + "log_modified DATETIME(6) NOT NULL, " + + "UNIQUE KEY ux_undo_log (xid, branch_id)" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4"); + + // Insert test data + stmt.execute("TRUNCATE TABLE accounts"); + try (PreparedStatement pstmt = conn.prepareStatement("INSERT INTO accounts (id, balance) VALUES (?, ?)")) { + for (int i = 1; i <= BenchmarkConstants.ACCOUNT_COUNT; i++) { + pstmt.setLong(1, i); + pstmt.setInt(2, BenchmarkConstants.INITIAL_BALANCE); + pstmt.addBatch(); + if (i % 100 == 0) { + pstmt.executeBatch(); + } + } + pstmt.executeBatch(); + } + + LOGGER.info( + "Database initialized: {} accounts with balance {}", + BenchmarkConstants.ACCOUNT_COUNT, + BenchmarkConstants.INITIAL_BALANCE); + + } catch (SQLException e) { + throw new RuntimeException("Failed to initialize database", e); + } + } + + @Override + protected String getTransactionName() { + return isRealMode() ? "benchmark-real-at-tx" : "benchmark-at-tx"; + } + + @Override + protected int getBranchCount() { + return config.getBranches(); + } + + @Override + protected void executeBusinessLogic() throws Exception { + if (isRealMode()) { + executeBranchOperations(config.getBranches()); + } + // Empty mode: do nothing (pure Seata protocol overhead testing) + } + + private void executeBranchOperations(int branchCount) throws SQLException { + // Execute N branch operations (simulating distributed transaction branches) + for (int i = 0; i < branchCount; i++) { + executeSingleBranch(); + } + } + + @SuppressWarnings("lgtm[java/insecure-randomness]") + private void executeSingleBranch() throws SQLException { + // Use DataSourceProxy connection to enable AT mode + try (Connection conn = dataSourceProxy.getConnection()) { + conn.setAutoCommit(false); + Review Comment: The suppress annotation with LGTM tag is for a deprecated static analysis tool (LGTM.com, which was shut down in 2022). Consider using modern alternatives like CodeQL or removing these annotations as they serve no purpose. ```suggestion private void executeSingleBranch() throws SQLException { // Use DataSourceProxy connection to enable AT mode try (Connection conn = dataSourceProxy.getConnection()) { conn.setAutoCommit(false); ``` ########## test-suite/seata-benchmark-cli/src/main/java/org/apache/seata/benchmark/executor/WorkloadGenerator.java: ########## @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.benchmark.executor; + +import com.google.common.util.concurrent.RateLimiter; +import org.apache.seata.benchmark.config.BenchmarkConfig; +import org.apache.seata.benchmark.constant.BenchmarkConstants; +import org.apache.seata.benchmark.model.BenchmarkMetrics; +import org.apache.seata.benchmark.model.TransactionRecord; +import org.apache.seata.common.thread.NamedThreadFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Workload generator with TPS rate limiting + */ +public class WorkloadGenerator { + + private static final Logger LOGGER = LoggerFactory.getLogger(WorkloadGenerator.class); + + private final BenchmarkConfig config; + private final TransactionExecutor executor; + private final BenchmarkMetrics metrics; + private final RateLimiter rateLimiter; + private final ExecutorService executorService; + private final AtomicBoolean running = new AtomicBoolean(false); + private final AtomicBoolean paused = new AtomicBoolean(false); + private final List<TransactionRecord> recentRecords = Collections.synchronizedList(new ArrayList<>()); + + public WorkloadGenerator(BenchmarkConfig config, TransactionExecutor executor, BenchmarkMetrics metrics) { + this.config = config; + this.executor = executor; + this.metrics = metrics; + this.rateLimiter = RateLimiter.create(config.getTargetTps()); + this.executorService = new ThreadPoolExecutor( + config.getThreads(), + config.getThreads(), + 0L, + TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<>(), + new NamedThreadFactory("workload-generator", config.getThreads())); + } + + public void start() { + if (running.compareAndSet(false, true)) { + LOGGER.info( + "Starting workload generator with target TPS: {}, threads: {}", + config.getTargetTps(), + config.getThreads()); + + long startTime = System.currentTimeMillis(); + long endTime = startTime + config.getDuration() * 1000L; + + for (int i = 0; i < config.getThreads(); i++) { + executorService.submit(() -> { + while (running.get() && System.currentTimeMillis() < endTime) { + if (paused.get()) { + try { + Thread.sleep(BenchmarkConstants.PAUSE_CHECK_INTERVAL_MS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + break; + } + continue; + } + + rateLimiter.acquire(); + executeTransaction(); + } + }); + } + + LOGGER.info("Workload generator started"); + } + } + + private void executeTransaction() { + try { + TransactionRecord record = executor.execute(); + + if (record.isSuccess()) { + metrics.recordSuccess(record.getDuration()); + } else { + metrics.recordFailure(record.getDuration()); + } + + addRecentRecord(record); + + } catch (Exception e) { + LOGGER.error("Transaction execution error", e); + metrics.recordFailure(0); + } + } + + private void addRecentRecord(TransactionRecord record) { + synchronized (recentRecords) { + recentRecords.add(0, record); + if (recentRecords.size() > BenchmarkConstants.MAX_RECENT_RECORDS) { + recentRecords.remove(recentRecords.size() - 1); + } + } + } + + public void pause() { + paused.set(true); + LOGGER.info("Workload generator paused"); + } + + public void resume() { + paused.set(false); + LOGGER.info("Workload generator resumed"); + } + + public boolean isPaused() { + return paused.get(); + } + + public void waitForCompletion() { + LOGGER.info("Waiting for benchmark completion..."); + long startTime = System.currentTimeMillis(); + long duration = config.getDuration() * 1000L; + long endTime = startTime + duration; + + try { + while (System.currentTimeMillis() < endTime) { + Thread.sleep(1000); + long elapsed = (System.currentTimeMillis() - startTime) / 1000; + if (elapsed % BenchmarkConstants.PROGRESS_REPORT_INTERVAL_SECONDS == 0 && elapsed > 0) { + System.out.printf( + "[%02d:%02d] %d txns, %.1f txns/sec, %.1f%% success%n", + elapsed / 60, + elapsed % 60, Review Comment: The progress reporting output format shows elapsed time in MM:SS format but doesn't handle hours. For benchmarks longer than 60 minutes, the display will be incorrect (e.g., 90 minutes would show as "90:30" instead of "01:30:30"). Consider using HH:MM:SS format or documenting the limitation. ```suggestion long hours = elapsed / 3600; long minutes = (elapsed % 3600) / 60; long seconds = elapsed % 60; System.out.printf( "[%02d:%02d:%02d] %d txns, %.1f txns/sec, %.1f%% success%n", hours, minutes, seconds, ``` ########## test-suite/seata-benchmark-cli/src/main/java/org/apache/seata/benchmark/executor/SagaModeExecutor.java: ########## @@ -0,0 +1,332 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.benchmark.executor; + +import org.apache.seata.benchmark.config.BenchmarkConfig; +import org.apache.seata.benchmark.model.TransactionRecord; +import org.apache.seata.benchmark.saga.BenchmarkServiceInvoker; +import org.apache.seata.benchmark.saga.InventorySagaService; +import org.apache.seata.benchmark.saga.OrderSagaService; +import org.apache.seata.benchmark.saga.PaymentSagaService; +import org.apache.seata.benchmark.saga.SimpleSpelExpressionFactory; +import org.apache.seata.core.exception.TransactionException; +import org.apache.seata.core.model.GlobalStatus; +import org.apache.seata.saga.engine.StateMachineEngine; +import org.apache.seata.saga.engine.config.AbstractStateMachineConfig; +import org.apache.seata.saga.engine.expression.ExpressionFactoryManager; +import org.apache.seata.saga.engine.impl.ProcessCtrlStateMachineEngine; +import org.apache.seata.saga.statelang.domain.DomainConstants; +import org.apache.seata.saga.statelang.domain.ExecutionStatus; +import org.apache.seata.saga.statelang.domain.StateMachineInstance; +import org.apache.seata.tm.api.GlobalTransaction; +import org.apache.seata.tm.api.GlobalTransactionContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.math.BigDecimal; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicLong; + +import static org.apache.seata.benchmark.constant.BenchmarkConstants.STATUS_COMMITTED; +import static org.apache.seata.benchmark.constant.BenchmarkConstants.STATUS_COMPENSATED; +import static org.apache.seata.benchmark.constant.BenchmarkConstants.STATUS_COMPENSATION_FAILED; +import static org.apache.seata.benchmark.constant.BenchmarkConstants.STATUS_FAILED; +import static org.apache.seata.benchmark.constant.BenchmarkConstants.STATUS_UNKNOWN; + +/** + * Saga mode transaction executor supporting both mock and real modes + * - branches == 0: Mock mode (simplified Saga simulation without state machine) + * - branches > 0: Real mode (state machine engine with compensation support) + */ +public class SagaModeExecutor implements TransactionExecutor { + + private static final Logger LOGGER = LoggerFactory.getLogger(SagaModeExecutor.class); + private static final AtomicLong BUSINESS_KEY_COUNTER = new AtomicLong(0); + + private static final String SIMPLE_SAGA_NAME = "benchmarkSimpleSaga"; + private static final String ORDER_SAGA_NAME = "benchmarkOrderSaga"; + + private final BenchmarkConfig config; + private StateMachineEngine stateMachineEngine; + private BenchmarkStateMachineConfig stateMachineConfig; + + public SagaModeExecutor(BenchmarkConfig config) { + this.config = config; + } + + private boolean isRealMode() { + return config.getBranches() > 0; + } + + @Override + public void init() { + if (isRealMode()) { + initRealMode(); + } else { + LOGGER.info("Saga mode executor initialized (simplified mock mode)"); + LOGGER.info("Note: Full Saga annotation support requires Spring framework integration"); + } + } + + private void initRealMode() { + LOGGER.info("Initializing Real Saga mode executor with state machine engine"); + + try { + // Create and configure state machine config + stateMachineConfig = new BenchmarkStateMachineConfig(); + stateMachineConfig.setRollbackPercentage(config.getRollbackPercentage()); + stateMachineConfig.init(); + + // Create state machine engine + ProcessCtrlStateMachineEngine engine = new ProcessCtrlStateMachineEngine(); + engine.setStateMachineConfig(stateMachineConfig); + this.stateMachineEngine = engine; + + LOGGER.info("Real Saga mode executor initialized"); + LOGGER.info("Available state machines: {}, {}", SIMPLE_SAGA_NAME, ORDER_SAGA_NAME); + + } catch (Exception e) { + throw new RuntimeException("Failed to initialize Saga state machine engine", e); + } + } + + @Override + public TransactionRecord execute() { + if (isRealMode()) { + return executeRealMode(); + } else { + return executeMockMode(); + } + } + + private TransactionRecord executeMockMode() { + GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate(); + long startTime = System.currentTimeMillis(); + String xid = null; + String status = STATUS_UNKNOWN; + int branchCount = config.getBranches(); + boolean success = false; + + try { + tx.begin(30000, "benchmark-saga-tx"); + xid = tx.getXid(); + + // Simulate Saga actions (forward phase) + for (int i = 0; i < branchCount; i++) { + simulateSagaAction(i); + } + + if (shouldRollback()) { + // Simulate compensation (backward phase) + for (int i = branchCount - 1; i >= 0; i--) { + simulateCompensation(i); + } + tx.rollback(); + status = STATUS_COMPENSATED; + } else { + tx.commit(); + status = STATUS_COMMITTED; + success = true; + } + + } catch (TransactionException e) { + LOGGER.debug("Transaction failed: {}", e.getMessage()); + status = STATUS_FAILED; + try { + if (tx.getStatus() != GlobalStatus.Rollbacked && tx.getStatus() != GlobalStatus.RollbackFailed) { + tx.rollback(); + } + } catch (TransactionException rollbackEx) { + LOGGER.debug("Rollback failed: {}", rollbackEx.getMessage()); + } + } + + long duration = System.currentTimeMillis() - startTime; + return new TransactionRecord(xid, status, duration, branchCount, success); + } + + private TransactionRecord executeRealMode() { + long startTime = System.currentTimeMillis(); + String businessKey = "benchmark-" + BUSINESS_KEY_COUNTER.incrementAndGet(); + String status = STATUS_UNKNOWN; + int branchCount = config.getBranches(); + boolean success = false; + + try { + // Choose state machine based on branch count + String stateMachineName = branchCount >= 3 ? ORDER_SAGA_NAME : SIMPLE_SAGA_NAME; + + // Prepare start parameters + Map<String, Object> startParams = createStartParams(); + + // Execute state machine + StateMachineInstance instance = stateMachineEngine.startWithBusinessKey( + stateMachineName, stateMachineConfig.getDefaultTenantId(), businessKey, startParams); + + // Check execution result + ExecutionStatus executionStatus = instance.getStatus(); + ExecutionStatus compensationStatus = instance.getCompensationStatus(); + + if (ExecutionStatus.SU.equals(executionStatus)) { + status = STATUS_COMMITTED; + success = true; + } else if (ExecutionStatus.FA.equals(executionStatus)) { + if (compensationStatus != null) { + if (ExecutionStatus.SU.equals(compensationStatus)) { + status = STATUS_COMPENSATED; + } else { + status = STATUS_COMPENSATION_FAILED; + } + } else { + status = STATUS_FAILED; + } + } else if (ExecutionStatus.UN.equals(executionStatus)) { + status = STATUS_UNKNOWN; + } else { + status = executionStatus != null ? executionStatus.name() : STATUS_UNKNOWN; + } + + } catch (Exception e) { + LOGGER.debug("Saga execution failed: {}", e.getMessage()); + status = STATUS_FAILED; + } + + long duration = System.currentTimeMillis() - startTime; + return new TransactionRecord(businessKey, status, duration, branchCount, success); + } + + @SuppressWarnings("lgtm[java/insecure-randomness]") + private Map<String, Object> createStartParams() { + Map<String, Object> params = new HashMap<>(); + params.put("userId", "user-" + ThreadLocalRandom.current().nextInt(1000)); + params.put("productId", "product-" + ThreadLocalRandom.current().nextInt(100)); + params.put("quantity", ThreadLocalRandom.current().nextInt(10) + 1); + params.put("amount", new BigDecimal(ThreadLocalRandom.current().nextInt(1000) + 100)); + params.put("accountId", "account-" + ThreadLocalRandom.current().nextInt(1000)); + return params; + } + + private void simulateSagaAction(int branchId) { + // Simulated Saga forward action + // In real implementation, this would be a @CompensationBusinessAction annotated method + LOGGER.trace("Executing Saga action for branch {}", branchId); + } + + private void simulateCompensation(int branchId) { + // Simulated Saga compensation action + // In real implementation, this would be the compensationMethod + LOGGER.trace("Executing compensation for branch {}", branchId); + } + + @SuppressWarnings("lgtm[java/insecure-randomness]") + private boolean shouldRollback() { + return ThreadLocalRandom.current().nextInt(100) < config.getRollbackPercentage(); + } + + @Override + public void destroy() { + if (isRealMode()) { + destroyRealMode(); + } + LOGGER.info("Saga mode executor destroyed"); + } + + private void destroyRealMode() { + LOGGER.info("Destroying Real Saga mode resources"); + // StateMachineEngine doesn't have a close method + stateMachineEngine = null; + stateMachineConfig = null; + } + + /** + * Custom StateMachineConfig for benchmark testing. + */ + private static class BenchmarkStateMachineConfig extends AbstractStateMachineConfig { + + private int rollbackPercentage = 0; + + public void setRollbackPercentage(int rollbackPercentage) { + this.rollbackPercentage = rollbackPercentage; + } + + @Override + public void init() throws Exception { + // Load state machine definitions from classpath + try (InputStream simpleSagaStream = getClass() + .getClassLoader() + .getResourceAsStream("seata/saga/statelang/benchmark_simple_saga.json"); + InputStream orderSagaStream = getClass() + .getClassLoader() + .getResourceAsStream("seata/saga/statelang/benchmark_order_saga.json")) { + + if (simpleSagaStream == null || orderSagaStream == null) { + throw new RuntimeException("Failed to load Saga state machine definitions from classpath"); + } + + // Read streams to byte arrays before closing, as super.init() may process them asynchronously + byte[] simpleBytes = readAllBytes(simpleSagaStream); + byte[] orderBytes = readAllBytes(orderSagaStream); + + setStateMachineDefInputStreamArray( + new InputStream[] {new ByteArrayInputStream(simpleBytes), new ByteArrayInputStream(orderBytes) + }); + + // Initialize parent config + super.init(); + + // Register SpEL expression factory for parameter evaluation + ExpressionFactoryManager expressionFactoryManager = getExpressionFactoryManager(); + SimpleSpelExpressionFactory spelExpressionFactory = new SimpleSpelExpressionFactory(); + // Register for default type (when expression doesn't start with $) + expressionFactoryManager.putExpressionFactory( + ExpressionFactoryManager.DEFAULT_EXPRESSION_TYPE, spelExpressionFactory); + // Register for empty type (when expression is like $.xxx where type is empty string) + expressionFactoryManager.putExpressionFactory("", spelExpressionFactory); + + // Register benchmark services with the service invoker manager + BenchmarkServiceInvoker serviceInvoker = new BenchmarkServiceInvoker(); + + // Register services with configured rollback percentage + // Divide rollback percentage by 3 for each service so total probability is approximately correct + int serviceRollbackPct = rollbackPercentage > 0 ? Math.max(1, rollbackPercentage / 3) : 0; + + serviceInvoker.registerService("orderService", new OrderSagaService(serviceRollbackPct, 5)); + serviceInvoker.registerService("inventoryService", new InventorySagaService(serviceRollbackPct, 5)); + serviceInvoker.registerService("paymentService", new PaymentSagaService(serviceRollbackPct, 5)); Review Comment: Hardcoded simulated delay of 5ms in service constructors may not be representative of real-world scenarios. Consider making this configurable through BenchmarkConfig to allow users to simulate different service latency profiles. ```suggestion // Determine service delay (in milliseconds), defaulting to 5 ms if not configured. int serviceDelayMillis = 5; try { Class<?> configClass = Class.forName("org.apache.seata.benchmark.config.BenchmarkConfig"); java.lang.reflect.Method getInstanceMethod = configClass.getMethod("getInstance"); Object configInstance = getInstanceMethod.invoke(null); java.lang.reflect.Method getDelayMethod = configClass.getMethod("getServiceDelayMillis"); Object delayValue = getDelayMethod.invoke(configInstance); if (delayValue instanceof Number) { serviceDelayMillis = ((Number) delayValue).intValue(); } } catch (Throwable ignored) { // Fallback to default delay when BenchmarkConfig or the accessor methods are not available. } // Register services with configured rollback percentage // Divide rollback percentage by 3 for each service so total probability is approximately correct int serviceRollbackPct = rollbackPercentage > 0 ? Math.max(1, rollbackPercentage / 3) : 0; serviceInvoker.registerService( "orderService", new OrderSagaService(serviceRollbackPct, serviceDelayMillis)); serviceInvoker.registerService( "inventoryService", new InventorySagaService(serviceRollbackPct, serviceDelayMillis)); serviceInvoker.registerService( "paymentService", new PaymentSagaService(serviceRollbackPct, serviceDelayMillis)); ``` ########## test-suite/seata-benchmark-cli/src/main/java/org/apache/seata/benchmark/executor/SagaModeExecutor.java: ########## @@ -0,0 +1,332 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.benchmark.executor; + +import org.apache.seata.benchmark.config.BenchmarkConfig; +import org.apache.seata.benchmark.model.TransactionRecord; +import org.apache.seata.benchmark.saga.BenchmarkServiceInvoker; +import org.apache.seata.benchmark.saga.InventorySagaService; +import org.apache.seata.benchmark.saga.OrderSagaService; +import org.apache.seata.benchmark.saga.PaymentSagaService; +import org.apache.seata.benchmark.saga.SimpleSpelExpressionFactory; +import org.apache.seata.core.exception.TransactionException; +import org.apache.seata.core.model.GlobalStatus; +import org.apache.seata.saga.engine.StateMachineEngine; +import org.apache.seata.saga.engine.config.AbstractStateMachineConfig; +import org.apache.seata.saga.engine.expression.ExpressionFactoryManager; +import org.apache.seata.saga.engine.impl.ProcessCtrlStateMachineEngine; +import org.apache.seata.saga.statelang.domain.DomainConstants; +import org.apache.seata.saga.statelang.domain.ExecutionStatus; +import org.apache.seata.saga.statelang.domain.StateMachineInstance; +import org.apache.seata.tm.api.GlobalTransaction; +import org.apache.seata.tm.api.GlobalTransactionContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.math.BigDecimal; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicLong; + +import static org.apache.seata.benchmark.constant.BenchmarkConstants.STATUS_COMMITTED; +import static org.apache.seata.benchmark.constant.BenchmarkConstants.STATUS_COMPENSATED; +import static org.apache.seata.benchmark.constant.BenchmarkConstants.STATUS_COMPENSATION_FAILED; +import static org.apache.seata.benchmark.constant.BenchmarkConstants.STATUS_FAILED; +import static org.apache.seata.benchmark.constant.BenchmarkConstants.STATUS_UNKNOWN; + +/** + * Saga mode transaction executor supporting both mock and real modes + * - branches == 0: Mock mode (simplified Saga simulation without state machine) + * - branches > 0: Real mode (state machine engine with compensation support) + */ +public class SagaModeExecutor implements TransactionExecutor { + + private static final Logger LOGGER = LoggerFactory.getLogger(SagaModeExecutor.class); + private static final AtomicLong BUSINESS_KEY_COUNTER = new AtomicLong(0); + + private static final String SIMPLE_SAGA_NAME = "benchmarkSimpleSaga"; + private static final String ORDER_SAGA_NAME = "benchmarkOrderSaga"; + + private final BenchmarkConfig config; + private StateMachineEngine stateMachineEngine; + private BenchmarkStateMachineConfig stateMachineConfig; + + public SagaModeExecutor(BenchmarkConfig config) { + this.config = config; + } + + private boolean isRealMode() { + return config.getBranches() > 0; + } + + @Override + public void init() { + if (isRealMode()) { + initRealMode(); + } else { + LOGGER.info("Saga mode executor initialized (simplified mock mode)"); + LOGGER.info("Note: Full Saga annotation support requires Spring framework integration"); + } + } + + private void initRealMode() { + LOGGER.info("Initializing Real Saga mode executor with state machine engine"); + + try { + // Create and configure state machine config + stateMachineConfig = new BenchmarkStateMachineConfig(); + stateMachineConfig.setRollbackPercentage(config.getRollbackPercentage()); + stateMachineConfig.init(); + + // Create state machine engine + ProcessCtrlStateMachineEngine engine = new ProcessCtrlStateMachineEngine(); + engine.setStateMachineConfig(stateMachineConfig); + this.stateMachineEngine = engine; + + LOGGER.info("Real Saga mode executor initialized"); + LOGGER.info("Available state machines: {}, {}", SIMPLE_SAGA_NAME, ORDER_SAGA_NAME); + + } catch (Exception e) { + throw new RuntimeException("Failed to initialize Saga state machine engine", e); + } + } + + @Override + public TransactionRecord execute() { + if (isRealMode()) { + return executeRealMode(); + } else { + return executeMockMode(); + } + } + + private TransactionRecord executeMockMode() { + GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate(); + long startTime = System.currentTimeMillis(); + String xid = null; + String status = STATUS_UNKNOWN; + int branchCount = config.getBranches(); + boolean success = false; + + try { + tx.begin(30000, "benchmark-saga-tx"); + xid = tx.getXid(); + + // Simulate Saga actions (forward phase) + for (int i = 0; i < branchCount; i++) { + simulateSagaAction(i); + } + + if (shouldRollback()) { + // Simulate compensation (backward phase) + for (int i = branchCount - 1; i >= 0; i--) { + simulateCompensation(i); + } + tx.rollback(); + status = STATUS_COMPENSATED; + } else { + tx.commit(); + status = STATUS_COMMITTED; + success = true; + } + + } catch (TransactionException e) { + LOGGER.debug("Transaction failed: {}", e.getMessage()); + status = STATUS_FAILED; + try { + if (tx.getStatus() != GlobalStatus.Rollbacked && tx.getStatus() != GlobalStatus.RollbackFailed) { + tx.rollback(); + } + } catch (TransactionException rollbackEx) { + LOGGER.debug("Rollback failed: {}", rollbackEx.getMessage()); + } + } + + long duration = System.currentTimeMillis() - startTime; + return new TransactionRecord(xid, status, duration, branchCount, success); + } + + private TransactionRecord executeRealMode() { + long startTime = System.currentTimeMillis(); + String businessKey = "benchmark-" + BUSINESS_KEY_COUNTER.incrementAndGet(); + String status = STATUS_UNKNOWN; + int branchCount = config.getBranches(); + boolean success = false; + + try { + // Choose state machine based on branch count + String stateMachineName = branchCount >= 3 ? ORDER_SAGA_NAME : SIMPLE_SAGA_NAME; + + // Prepare start parameters + Map<String, Object> startParams = createStartParams(); + + // Execute state machine + StateMachineInstance instance = stateMachineEngine.startWithBusinessKey( + stateMachineName, stateMachineConfig.getDefaultTenantId(), businessKey, startParams); + + // Check execution result + ExecutionStatus executionStatus = instance.getStatus(); + ExecutionStatus compensationStatus = instance.getCompensationStatus(); + + if (ExecutionStatus.SU.equals(executionStatus)) { + status = STATUS_COMMITTED; + success = true; + } else if (ExecutionStatus.FA.equals(executionStatus)) { + if (compensationStatus != null) { + if (ExecutionStatus.SU.equals(compensationStatus)) { + status = STATUS_COMPENSATED; + } else { + status = STATUS_COMPENSATION_FAILED; + } + } else { + status = STATUS_FAILED; + } + } else if (ExecutionStatus.UN.equals(executionStatus)) { + status = STATUS_UNKNOWN; + } else { + status = executionStatus != null ? executionStatus.name() : STATUS_UNKNOWN; + } + + } catch (Exception e) { + LOGGER.debug("Saga execution failed: {}", e.getMessage()); + status = STATUS_FAILED; + } + + long duration = System.currentTimeMillis() - startTime; + return new TransactionRecord(businessKey, status, duration, branchCount, success); + } + + @SuppressWarnings("lgtm[java/insecure-randomness]") + private Map<String, Object> createStartParams() { + Map<String, Object> params = new HashMap<>(); + params.put("userId", "user-" + ThreadLocalRandom.current().nextInt(1000)); + params.put("productId", "product-" + ThreadLocalRandom.current().nextInt(100)); + params.put("quantity", ThreadLocalRandom.current().nextInt(10) + 1); + params.put("amount", new BigDecimal(ThreadLocalRandom.current().nextInt(1000) + 100)); + params.put("accountId", "account-" + ThreadLocalRandom.current().nextInt(1000)); + return params; + } + + private void simulateSagaAction(int branchId) { + // Simulated Saga forward action + // In real implementation, this would be a @CompensationBusinessAction annotated method + LOGGER.trace("Executing Saga action for branch {}", branchId); + } + + private void simulateCompensation(int branchId) { + // Simulated Saga compensation action + // In real implementation, this would be the compensationMethod + LOGGER.trace("Executing compensation for branch {}", branchId); + } + + @SuppressWarnings("lgtm[java/insecure-randomness]") + private boolean shouldRollback() { + return ThreadLocalRandom.current().nextInt(100) < config.getRollbackPercentage(); + } + + @Override + public void destroy() { + if (isRealMode()) { + destroyRealMode(); + } + LOGGER.info("Saga mode executor destroyed"); + } + + private void destroyRealMode() { + LOGGER.info("Destroying Real Saga mode resources"); + // StateMachineEngine doesn't have a close method + stateMachineEngine = null; + stateMachineConfig = null; + } + + /** + * Custom StateMachineConfig for benchmark testing. + */ + private static class BenchmarkStateMachineConfig extends AbstractStateMachineConfig { + + private int rollbackPercentage = 0; + + public void setRollbackPercentage(int rollbackPercentage) { + this.rollbackPercentage = rollbackPercentage; + } + + @Override + public void init() throws Exception { + // Load state machine definitions from classpath + try (InputStream simpleSagaStream = getClass() + .getClassLoader() + .getResourceAsStream("seata/saga/statelang/benchmark_simple_saga.json"); + InputStream orderSagaStream = getClass() + .getClassLoader() + .getResourceAsStream("seata/saga/statelang/benchmark_order_saga.json")) { + + if (simpleSagaStream == null || orderSagaStream == null) { + throw new RuntimeException("Failed to load Saga state machine definitions from classpath"); + } + + // Read streams to byte arrays before closing, as super.init() may process them asynchronously + byte[] simpleBytes = readAllBytes(simpleSagaStream); + byte[] orderBytes = readAllBytes(orderSagaStream); + + setStateMachineDefInputStreamArray( + new InputStream[] {new ByteArrayInputStream(simpleBytes), new ByteArrayInputStream(orderBytes) + }); + + // Initialize parent config + super.init(); + + // Register SpEL expression factory for parameter evaluation + ExpressionFactoryManager expressionFactoryManager = getExpressionFactoryManager(); + SimpleSpelExpressionFactory spelExpressionFactory = new SimpleSpelExpressionFactory(); + // Register for default type (when expression doesn't start with $) + expressionFactoryManager.putExpressionFactory( + ExpressionFactoryManager.DEFAULT_EXPRESSION_TYPE, spelExpressionFactory); + // Register for empty type (when expression is like $.xxx where type is empty string) + expressionFactoryManager.putExpressionFactory("", spelExpressionFactory); + + // Register benchmark services with the service invoker manager + BenchmarkServiceInvoker serviceInvoker = new BenchmarkServiceInvoker(); + + // Register services with configured rollback percentage + // Divide rollback percentage by 3 for each service so total probability is approximately correct + int serviceRollbackPct = rollbackPercentage > 0 ? Math.max(1, rollbackPercentage / 3) : 0; + + serviceInvoker.registerService("orderService", new OrderSagaService(serviceRollbackPct, 5)); + serviceInvoker.registerService("inventoryService", new InventorySagaService(serviceRollbackPct, 5)); + serviceInvoker.registerService("paymentService", new PaymentSagaService(serviceRollbackPct, 5)); + + // Register the service invoker for different service types + getServiceInvokerManager().putServiceInvoker(DomainConstants.SERVICE_TYPE_SPRING_BEAN, serviceInvoker); + } + } + + private byte[] readAllBytes(InputStream is) throws IOException { + ByteArrayOutputStream buffer = new ByteArrayOutputStream(); + byte[] data = new byte[8192]; + int nRead; + while ((nRead = is.read(data, 0, data.length)) != -1) { + buffer.write(data, 0, nRead); Review Comment: The variable 'nRead' follows Java naming conventions, but the typical convention in I/O operations is to use 'bytesRead' or 'numRead' for better clarity about what the variable represents. ```suggestion int bytesRead; while ((bytesRead = is.read(data, 0, data.length)) != -1) { buffer.write(data, 0, bytesRead); ``` ########## test-suite/seata-benchmark-cli/src/main/java/org/apache/seata/benchmark/executor/SagaModeExecutor.java: ########## @@ -0,0 +1,332 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.benchmark.executor; + +import org.apache.seata.benchmark.config.BenchmarkConfig; +import org.apache.seata.benchmark.model.TransactionRecord; +import org.apache.seata.benchmark.saga.BenchmarkServiceInvoker; +import org.apache.seata.benchmark.saga.InventorySagaService; +import org.apache.seata.benchmark.saga.OrderSagaService; +import org.apache.seata.benchmark.saga.PaymentSagaService; +import org.apache.seata.benchmark.saga.SimpleSpelExpressionFactory; +import org.apache.seata.core.exception.TransactionException; +import org.apache.seata.core.model.GlobalStatus; +import org.apache.seata.saga.engine.StateMachineEngine; +import org.apache.seata.saga.engine.config.AbstractStateMachineConfig; +import org.apache.seata.saga.engine.expression.ExpressionFactoryManager; +import org.apache.seata.saga.engine.impl.ProcessCtrlStateMachineEngine; +import org.apache.seata.saga.statelang.domain.DomainConstants; +import org.apache.seata.saga.statelang.domain.ExecutionStatus; +import org.apache.seata.saga.statelang.domain.StateMachineInstance; +import org.apache.seata.tm.api.GlobalTransaction; +import org.apache.seata.tm.api.GlobalTransactionContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.math.BigDecimal; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicLong; + +import static org.apache.seata.benchmark.constant.BenchmarkConstants.STATUS_COMMITTED; +import static org.apache.seata.benchmark.constant.BenchmarkConstants.STATUS_COMPENSATED; +import static org.apache.seata.benchmark.constant.BenchmarkConstants.STATUS_COMPENSATION_FAILED; +import static org.apache.seata.benchmark.constant.BenchmarkConstants.STATUS_FAILED; +import static org.apache.seata.benchmark.constant.BenchmarkConstants.STATUS_UNKNOWN; + +/** + * Saga mode transaction executor supporting both mock and real modes + * - branches == 0: Mock mode (simplified Saga simulation without state machine) + * - branches > 0: Real mode (state machine engine with compensation support) + */ +public class SagaModeExecutor implements TransactionExecutor { + + private static final Logger LOGGER = LoggerFactory.getLogger(SagaModeExecutor.class); + private static final AtomicLong BUSINESS_KEY_COUNTER = new AtomicLong(0); + + private static final String SIMPLE_SAGA_NAME = "benchmarkSimpleSaga"; + private static final String ORDER_SAGA_NAME = "benchmarkOrderSaga"; + + private final BenchmarkConfig config; + private StateMachineEngine stateMachineEngine; + private BenchmarkStateMachineConfig stateMachineConfig; + + public SagaModeExecutor(BenchmarkConfig config) { + this.config = config; + } + + private boolean isRealMode() { + return config.getBranches() > 0; + } + + @Override + public void init() { + if (isRealMode()) { + initRealMode(); + } else { + LOGGER.info("Saga mode executor initialized (simplified mock mode)"); + LOGGER.info("Note: Full Saga annotation support requires Spring framework integration"); + } + } + + private void initRealMode() { + LOGGER.info("Initializing Real Saga mode executor with state machine engine"); + + try { + // Create and configure state machine config + stateMachineConfig = new BenchmarkStateMachineConfig(); + stateMachineConfig.setRollbackPercentage(config.getRollbackPercentage()); + stateMachineConfig.init(); + + // Create state machine engine + ProcessCtrlStateMachineEngine engine = new ProcessCtrlStateMachineEngine(); + engine.setStateMachineConfig(stateMachineConfig); + this.stateMachineEngine = engine; + + LOGGER.info("Real Saga mode executor initialized"); + LOGGER.info("Available state machines: {}, {}", SIMPLE_SAGA_NAME, ORDER_SAGA_NAME); + + } catch (Exception e) { + throw new RuntimeException("Failed to initialize Saga state machine engine", e); + } + } + + @Override + public TransactionRecord execute() { + if (isRealMode()) { + return executeRealMode(); + } else { + return executeMockMode(); + } + } + + private TransactionRecord executeMockMode() { + GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate(); + long startTime = System.currentTimeMillis(); + String xid = null; + String status = STATUS_UNKNOWN; + int branchCount = config.getBranches(); + boolean success = false; + + try { + tx.begin(30000, "benchmark-saga-tx"); + xid = tx.getXid(); + + // Simulate Saga actions (forward phase) + for (int i = 0; i < branchCount; i++) { + simulateSagaAction(i); + } + + if (shouldRollback()) { + // Simulate compensation (backward phase) + for (int i = branchCount - 1; i >= 0; i--) { + simulateCompensation(i); + } + tx.rollback(); + status = STATUS_COMPENSATED; + } else { + tx.commit(); + status = STATUS_COMMITTED; + success = true; + } + + } catch (TransactionException e) { + LOGGER.debug("Transaction failed: {}", e.getMessage()); + status = STATUS_FAILED; + try { + if (tx.getStatus() != GlobalStatus.Rollbacked && tx.getStatus() != GlobalStatus.RollbackFailed) { + tx.rollback(); + } + } catch (TransactionException rollbackEx) { + LOGGER.debug("Rollback failed: {}", rollbackEx.getMessage()); + } + } + + long duration = System.currentTimeMillis() - startTime; + return new TransactionRecord(xid, status, duration, branchCount, success); + } + + private TransactionRecord executeRealMode() { + long startTime = System.currentTimeMillis(); + String businessKey = "benchmark-" + BUSINESS_KEY_COUNTER.incrementAndGet(); + String status = STATUS_UNKNOWN; + int branchCount = config.getBranches(); + boolean success = false; + + try { + // Choose state machine based on branch count + String stateMachineName = branchCount >= 3 ? ORDER_SAGA_NAME : SIMPLE_SAGA_NAME; + + // Prepare start parameters + Map<String, Object> startParams = createStartParams(); + + // Execute state machine + StateMachineInstance instance = stateMachineEngine.startWithBusinessKey( + stateMachineName, stateMachineConfig.getDefaultTenantId(), businessKey, startParams); + + // Check execution result + ExecutionStatus executionStatus = instance.getStatus(); + ExecutionStatus compensationStatus = instance.getCompensationStatus(); + + if (ExecutionStatus.SU.equals(executionStatus)) { + status = STATUS_COMMITTED; + success = true; + } else if (ExecutionStatus.FA.equals(executionStatus)) { + if (compensationStatus != null) { + if (ExecutionStatus.SU.equals(compensationStatus)) { + status = STATUS_COMPENSATED; + } else { + status = STATUS_COMPENSATION_FAILED; + } + } else { + status = STATUS_FAILED; + } + } else if (ExecutionStatus.UN.equals(executionStatus)) { + status = STATUS_UNKNOWN; + } else { + status = executionStatus != null ? executionStatus.name() : STATUS_UNKNOWN; + } + + } catch (Exception e) { + LOGGER.debug("Saga execution failed: {}", e.getMessage()); + status = STATUS_FAILED; + } + + long duration = System.currentTimeMillis() - startTime; + return new TransactionRecord(businessKey, status, duration, branchCount, success); + } + + @SuppressWarnings("lgtm[java/insecure-randomness]") + private Map<String, Object> createStartParams() { + Map<String, Object> params = new HashMap<>(); + params.put("userId", "user-" + ThreadLocalRandom.current().nextInt(1000)); + params.put("productId", "product-" + ThreadLocalRandom.current().nextInt(100)); + params.put("quantity", ThreadLocalRandom.current().nextInt(10) + 1); + params.put("amount", new BigDecimal(ThreadLocalRandom.current().nextInt(1000) + 100)); + params.put("accountId", "account-" + ThreadLocalRandom.current().nextInt(1000)); + return params; + } + + private void simulateSagaAction(int branchId) { + // Simulated Saga forward action + // In real implementation, this would be a @CompensationBusinessAction annotated method + LOGGER.trace("Executing Saga action for branch {}", branchId); + } + + private void simulateCompensation(int branchId) { + // Simulated Saga compensation action + // In real implementation, this would be the compensationMethod + LOGGER.trace("Executing compensation for branch {}", branchId); + } + + @SuppressWarnings("lgtm[java/insecure-randomness]") Review Comment: The suppress annotation with LGTM tag references a deprecated static analysis tool (LGTM.com). Consider using modern alternatives like CodeQL or removing these annotations. ########## test-suite/seata-benchmark-cli/src/main/java/org/apache/seata/benchmark/executor/ATModeExecutor.java: ########## @@ -0,0 +1,249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.benchmark.executor; + +import com.zaxxer.hikari.HikariConfig; +import com.zaxxer.hikari.HikariDataSource; +import org.apache.seata.benchmark.config.BenchmarkConfig; +import org.apache.seata.benchmark.constant.BenchmarkConstants; +import org.apache.seata.rm.datasource.DataSourceProxy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.MySQLContainer; +import org.testcontainers.utility.DockerImageName; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.concurrent.ThreadLocalRandom; + +/** + * AT mode transaction executor supporting both empty and real transaction modes + * - branches == 0: Empty transaction mode (pure Seata protocol overhead testing) + * - branches > 0: Real mode with MySQL database (via Testcontainers) + */ +public class ATModeExecutor extends AbstractTransactionExecutor { + + private static final Logger LOGGER = LoggerFactory.getLogger(ATModeExecutor.class); + + private MySQLContainer<?> mysqlContainer; + private HikariDataSource rawDataSource; + private DataSourceProxy dataSourceProxy; + + public ATModeExecutor(BenchmarkConfig config) { + super(config); + } + + private boolean isRealMode() { + return config.getBranches() > 0; + } + + @Override + public void init() { + if (isRealMode()) { + LOGGER.info("Initializing AT mode executor (MySQL via Testcontainers)"); + initRealMode(); + } else { + LOGGER.info("AT mode executor initialized (empty transaction mode)"); + } + } + + private void initRealMode() { + // Start MySQL container + startMySQLContainer(); + + // Create HikariCP connection pool + createDataSource(); + + // Initialize database schema and data + initDatabase(); + + // Wrap with Seata DataSourceProxy for AT mode + dataSourceProxy = new DataSourceProxy(rawDataSource); + + LOGGER.info("DataSourceProxy initialized, dbType: {}", dataSourceProxy.getDbType()); + LOGGER.info("Real AT mode executor initialized with {} accounts", BenchmarkConstants.ACCOUNT_COUNT); Review Comment: Potential resource leak: If an exception occurs during initialization between starting the MySQL container (line 91) and creating the DataSourceProxy (line 77), the MySQL container will not be stopped. Consider wrapping the initialization in try-catch and ensuring proper cleanup, or using try-with-resources pattern where applicable. ```suggestion /** * Cleanup resources if real mode initialization fails. */ private void cleanupResourcesOnFailure() { if (rawDataSource != null) { try { rawDataSource.close(); } catch (Exception e) { LOGGER.warn("Failed to close HikariCP DataSource during cleanup", e); } finally { rawDataSource = null; } } if (mysqlContainer != null) { try { mysqlContainer.stop(); } catch (Exception e) { LOGGER.warn("Failed to stop MySQL container during cleanup", e); } finally { mysqlContainer = null; } } } private void initRealMode() { try { // Start MySQL container startMySQLContainer(); // Create HikariCP connection pool createDataSource(); // Initialize database schema and data initDatabase(); // Wrap with Seata DataSourceProxy for AT mode dataSourceProxy = new DataSourceProxy(rawDataSource); LOGGER.info("DataSourceProxy initialized, dbType: {}", dataSourceProxy.getDbType()); LOGGER.info("Real AT mode executor initialized with {} accounts", BenchmarkConstants.ACCOUNT_COUNT); } catch (Exception e) { cleanupResourcesOnFailure(); throw (e instanceof RuntimeException) ? (RuntimeException) e : new RuntimeException("Failed to initialize real AT mode executor", e); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
