lihjChina opened a new issue, #9870: URL: https://github.com/apache/seatunnel/issues/9870
### Search before asking - [x] I had searched in the [issues](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22bug%22) and found no similar issues. ### What happened I run `run.sh 20`. My source MySQL database has approximately 3.8 million records ### SeaTunnel Version dev ### SeaTunnel Config ```conf env { "job.mode"="BATCH" "job.name"="task_12_20240308" } source { Jdbc { url="jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&rewriteBatchedStatements=true&serverTimezone=Asia%2FShanghai&zeroDateTimeBehavior=convertToNull&characterEncoding=utf-8&useSSL=false" driver="com.mysql.cj.jdbc.Driver" schema="test" user="root" password="xxx" query="SELECT `pl_curr_cd`,`murex_intrnl_tran_id`,`murex_port_cd`,`murex_dts_ref`,`batch_dt` FROM `bj`.`ds_lv1_pl_details_test` WHERE 1=1 " "fetch_size"="1000" "result_table_name"="E000001_source_1_12" "parallelism"=1 "empty_data_strategy"=true "table_path"=1 } } transform { } sink { LocalFile { path="/dsg/20240308" source_table_name="E000001_source_1_12" tmp_path="/tmp/seatunnel" custom_filename=true file_name_expression="ds_lv1_pl_details_test.txt_${transactionId}" is_enable_transaction=false filename_time_format="yyyy.MM.dd" file_format_type="text" field_delimiter="\u0001" row_delimiter="\n" sink_columns=["pl_curr_cd","murex_intrnl_tran_id","murex_port_cd","murex_dts_ref","batch_dt"] batch_size="1000000" compress_codec="none" date_format="yyyy-MM-dd" datetime_format="yyyy-MM-dd HH:mm:ss" time_format="HH:mm:ss" encoding="UTF-8" } } ``` ### Running Command ```shell CONCURRENCY=1 # 并发数量,根据服务器性能调整 SEATUNNEL_SCRIPT="../bin/seatunnel.sh" TEMP_DIR="./temp_configs" # 临时配置文件目录 # 创建临时目录存放配置文件 mkdir -p "$TEMP_DIR" # 生成随机日期的函数(格式:YYYYMMDD) generate_random_date() { local year=$((2020 + RANDOM % 5)) # 2020-2024年 local month=$((1 + RANDOM % 12)) local day=$((1 + RANDOM % 28)) # 避免处理2月29日等复杂情况 # 格式化月份和日期为两位数 printf "%d%02d%02d" "$year" "$month" "$day" } # 生成并执行任务的函数 execute_task() { local task_id=$1 local random_date=$(generate_random_date) # 生成配置文件内容 local config_content=$(cat << EOF env { "job.mode"="BATCH" "job.name"="task_${task_id}_${random_date}" } source { Jdbc { url="jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&rewriteBatchedStatements=true&serverTimezone=Asia%2FShanghai&zeroDateTimeBehavior=convertToNull&characterEncoding=utf-8&useSSL=false" driver="com.mysql.cj.jdbc.Driver" schema="test" user="root" password="xxx" query="SELECT \`pl_curr_cd\`,\`murex_intrnl_tran_id\`,\`murex_port_cd\`,\`murex_dts_ref\`,\`batch_dt\` FROM \`bj\`.\`ds_lv1_pl_details_test\` WHERE 1=1 " "fetch_size"="1000" "result_table_name"="E000001_source_1_${task_id}" "parallelism"=1 "empty_data_strategy"=true "table_path"=1 } } transform { } sink { SftpFile { host="172.20.100.238" port="22" user="root" password="xxx" path="/test/${random_date}" source_table_name="E000001_source_1" tmp_path="/tmp/seatunnel" custom_filename=true file_name_expression="ds_lv1_pl_details_test.txt_\${transactionId}" is_enable_transaction=false filename_time_format="yyyy.MM.dd" file_format_type="text" field_delimiter="\u0001" row_delimiter="\n" sink_columns=["pl_curr_cd","murex_intrnl_tran_id","murex_port_cd","murex_dts_ref","batch_dt"] batch_size="1000000" compress_codec="none" date_format="yyyy-MM-dd" datetime_format="yyyy-MM-dd HH:mm:ss" time_format="HH:mm:ss" encoding="UTF-8" } } EOF ) # 写入临时配置文件 local config_file="${TEMP_DIR}/task_${task_id}_${random_date}.conf" echo "$config_content" > "$config_file" echo "$(date '+%Y-%m-%d %H:%M:%S') - 开始执行任务 $task_id,随机日期: $random_date" # 执行 Seatunnel 任务 "$SEATUNNEL_SCRIPT" -c "$config_file" local exit_code=$? if [ $exit_code -eq 0 ]; then echo "$(date '+%Y-%m-%d %H:%M:%S') - 任务 $task_id (日期: $random_date) 执行成功" else echo "$(date '+%Y-%m-%d %H:%M:%S') - 任务 $task_id (日期: $random_date) 执行失败,退出码: $exit_code" fi return $exit_code } # 主执行逻辑 main() { echo "$(date '+%Y-%m-%d %H:%M:%S') - 开始并发执行任务,并发数: $CONCURRENCY" # 存储进程ID和日期的数组 declare -a pids declare -a task_dates declare -a task_ids # 启动并发任务 for ((i=1; i<=CONCURRENCY; i++)); do execute_task $i & pids[$i]=$! task_ids[$i]=$i done # 等待所有任务完成 for ((i=1; i<=CONCURRENCY; i++)); do wait ${pids[$i]} exit_codes[$i]=$? done # 输出执行结果摘要 echo "" echo "======= 任务执行结果摘要 =======" for ((i=1; i<=CONCURRENCY; i++)); do if [ ${exit_codes[$i]} -eq 0 ]; then echo "任务 ${task_ids[$i]}: 成功" else echo "任务 ${task_ids[$i]}: 失败 (退出码: ${exit_codes[$i]})" fi done echo "$(date '+%Y-%m-%d %H:%M:%S') - 所有任务执行完成" } # 带参数版本的函数 main_with_args() { local custom_concurrency=$1 if [ -n "$custom_concurrency" ] && [[ "$custom_concurrency" =~ ^[0-9]+$ ]]; then CONCURRENCY="$custom_concurrency" fi main } # 清理临时文件函数 cleanup() { echo "清理临时配置文件..." } # 设置退出时清理 trap cleanup EXIT # 执行主函数 if [ $# -ge 1 ]; then main_with_args "$1" else main fi ``` ### Error Exception ```log 2025-09-17 13:35:15,739 ERROR [o.a.s.c.s.SeaTunnel ] [main] - Fatal Error, 2025-09-17 13:35:15,739 ERROR [o.a.s.c.s.SeaTunnel ] [main] - Please submit bug report in https://github.com/apache/seatunnel/issues 2025-09-17 13:35:15,739 ERROR [o.a.s.c.s.SeaTunnel ] [main] - Reason:SeaTunnel job executed failed 2025-09-17 13:35:15,740 ERROR [o.a.s.c.s.SeaTunnel ] [main] - Exception StackTrace:org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:228) at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40) at org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:40) Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: org.apache.seatunnel.engine.server.checkpoint.CheckpointException: CheckpointCoordinator inside have error. at org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.handleCoordinatorError(CheckpointCoordinator.java:282) at org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.handleCoordinatorError(CheckpointCoordinator.java:278) at org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.reportCheckpointErrorFromTask(CheckpointCoordinator.java:398) at org.apache.seatunnel.engine.server.checkpoint.CheckpointManager.reportCheckpointErrorFromTask(CheckpointManager.java:174) at org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointErrorReportOperation.runInternal(CheckpointErrorReportOperation.java:48) at org.apache.seatunnel.engine.server.task.operation.TracingOperation.run(TracingOperation.java:42) at com.hazelcast.spi.impl.operationservice.Operation.call(Operation.java:189) at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.call(OperationRunnerImpl.java:273) at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:248) at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:213) at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:175) at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:139) at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.executeRun(OperationThread.java:123) at com.hazelcast.internal.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:102) Caused by: org.apache.seatunnel.common.utils.SeaTunnelException: org.apache.seatunnel.engine.server.checkpoint.CheckpointException: Aggregate commit error. at org.apache.seatunnel.engine.server.task.SinkAggregatedCommitterTask.restoreState(SinkAggregatedCommitterTask.java:278) at org.apache.seatunnel.engine.server.checkpoint.operation.NotifyTaskRestoreOperation.lambda$null$0(NotifyTaskRestoreOperation.java:107) at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1640) at org.apache.seatunnel.api.tracing.MDCRunnable.run(MDCRunnable.java:43) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) ... 12 more at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:220) ... 2 more 2025-09-17 13:35:15,740 ERROR [o.a.s.c.s.SeaTunnel ] [main] - =============================================================================== Exception in thread "main" org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:228) at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40) at org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:40) Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: org.apache.seatunnel.engine.server.checkpoint.CheckpointException: CheckpointCoordinator inside have error. at org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.handleCoordinatorError(CheckpointCoordinator.java:282) at org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.handleCoordinatorError(CheckpointCoordinator.java:278) at org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.reportCheckpointErrorFromTask(CheckpointManager.java:398) at org.apache.seatunnel.engine.server.checkpoint.CheckpointManager.reportCheckpointErrorFromTask(CheckpointManager.java:174) at org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointErrorReportOperation.runInternal(CheckpointErrorReportOperation.java:48) at org.apache.seatunnel.engine.server.task.operation.TracingOperation.run(TracingOperation.java:42) at com.hazelcast.spi.impl.operationservice.Operation.call(Operation.java:189) at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.call(OperationRunnerImpl.java:273) at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:248) at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:213) at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:175) at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:139) at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.executeRun(OperationThread.java:123) at com.hazelcast.internal.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:102) Caused by: org.apache.seatunnel.common.utils.SeaTunnelException: org.apache.seatunnel.engine.server.checkpoint.CheckpointException: Aggregate commit error. at org.apache.seatunnel.engine.server.task.SinkAggregatedCommitterTask.restoreState(SinkAggregatedCommitterTask.java:278) at org.apache.seatunnel.engine.server.checkpoint.operation.NotifyTaskRestoreOperation.lambda$null$0(NotifyTaskRestoreOperation.java:107) at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1640) at org.apache.seatunnel.api.tracing.MDCRunnable.run(MDCRunnable.java:43) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) ... 12 more at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:220) ... 2 more ``` ### Zeta or Flink or Spark Version _No response_ ### Java or Scala Version _No response_ ### Screenshots _No response_ ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [x] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
