lihjChina opened a new issue, #9870:
URL: https://github.com/apache/seatunnel/issues/9870

   ### Search before asking
   
   - [x] I had searched in the 
[issues](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22bug%22)
 and found no similar issues.
   
   
   ### What happened
   
   I run `run.sh 20`. My source MySQL database has approximately 3.8 million 
records
   
   ### SeaTunnel Version
   
   dev
   
   ### SeaTunnel Config
   
   ```conf
   env {
   "job.mode"="BATCH"
   "job.name"="task_12_20240308"
   }
   source {
           Jdbc {
           
url="jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&rewriteBatchedStatements=true&serverTimezone=Asia%2FShanghai&zeroDateTimeBehavior=convertToNull&characterEncoding=utf-8&useSSL=false"
           driver="com.mysql.cj.jdbc.Driver"
               schema="test"
               user="root"
               password="xxx"
           query="SELECT 
`pl_curr_cd`,`murex_intrnl_tran_id`,`murex_port_cd`,`murex_dts_ref`,`batch_dt` 
FROM `bj`.`ds_lv1_pl_details_test` WHERE 1=1 "
           "fetch_size"="1000"
               "result_table_name"="E000001_source_1_12"
               "parallelism"=1
               "empty_data_strategy"=true
               "table_path"=1
           }
   }
   transform {
   }
   sink {
           LocalFile {
           path="/dsg/20240308"
                source_table_name="E000001_source_1_12"
                tmp_path="/tmp/seatunnel"
                custom_filename=true
                
file_name_expression="ds_lv1_pl_details_test.txt_${transactionId}"
                is_enable_transaction=false
                filename_time_format="yyyy.MM.dd"
                file_format_type="text"
                field_delimiter="\u0001"
                row_delimiter="\n"
                
sink_columns=["pl_curr_cd","murex_intrnl_tran_id","murex_port_cd","murex_dts_ref","batch_dt"]
                batch_size="1000000"
                compress_codec="none"
                date_format="yyyy-MM-dd"
                datetime_format="yyyy-MM-dd HH:mm:ss"
                time_format="HH:mm:ss"
           encoding="UTF-8"
           }
   }
   ```
   
   ### Running Command
   
   ```shell
   CONCURRENCY=1         # 并发数量,根据服务器性能调整
   SEATUNNEL_SCRIPT="../bin/seatunnel.sh"
   TEMP_DIR="./temp_configs"  # 临时配置文件目录
   
   # 创建临时目录存放配置文件
   mkdir -p "$TEMP_DIR"
   
   # 生成随机日期的函数(格式:YYYYMMDD)
   generate_random_date() {
       local year=$((2020 + RANDOM % 5))  # 2020-2024年
       local month=$((1 + RANDOM % 12))
       local day=$((1 + RANDOM % 28))     # 避免处理2月29日等复杂情况
       
       # 格式化月份和日期为两位数
       printf "%d%02d%02d" "$year" "$month" "$day"
   }
   
   # 生成并执行任务的函数
   execute_task() {
       local task_id=$1
       local random_date=$(generate_random_date)
       
       # 生成配置文件内容
       local config_content=$(cat << EOF
   env {
   "job.mode"="BATCH"
   "job.name"="task_${task_id}_${random_date}"
   }
   source {
           Jdbc {
           
url="jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&rewriteBatchedStatements=true&serverTimezone=Asia%2FShanghai&zeroDateTimeBehavior=convertToNull&characterEncoding=utf-8&useSSL=false"
           driver="com.mysql.cj.jdbc.Driver"
               schema="test"
               user="root"
               password="xxx"
           query="SELECT 
\`pl_curr_cd\`,\`murex_intrnl_tran_id\`,\`murex_port_cd\`,\`murex_dts_ref\`,\`batch_dt\`
 FROM \`bj\`.\`ds_lv1_pl_details_test\` WHERE 1=1 "
           "fetch_size"="1000"
               "result_table_name"="E000001_source_1_${task_id}"
               "parallelism"=1
               "empty_data_strategy"=true
               "table_path"=1
           }
   }
   transform {
   }
   sink {
           SftpFile {
           host="172.20.100.238"
           port="22"
           user="root"
           password="xxx"
           path="/test/${random_date}"
                source_table_name="E000001_source_1"
                tmp_path="/tmp/seatunnel"
                custom_filename=true
                
file_name_expression="ds_lv1_pl_details_test.txt_\${transactionId}"
                is_enable_transaction=false
                filename_time_format="yyyy.MM.dd"
                file_format_type="text"
                field_delimiter="\u0001"
                row_delimiter="\n"
                
sink_columns=["pl_curr_cd","murex_intrnl_tran_id","murex_port_cd","murex_dts_ref","batch_dt"]
                batch_size="1000000"
                compress_codec="none"
                date_format="yyyy-MM-dd"
                datetime_format="yyyy-MM-dd HH:mm:ss"
                time_format="HH:mm:ss"
           encoding="UTF-8"
           }
   }
   EOF
       )
   
       # 写入临时配置文件
       local config_file="${TEMP_DIR}/task_${task_id}_${random_date}.conf"
       echo "$config_content" > "$config_file"
       
       echo "$(date '+%Y-%m-%d %H:%M:%S') - 开始执行任务 $task_id,随机日期: $random_date"
       
       # 执行 Seatunnel 任务
       "$SEATUNNEL_SCRIPT" -c "$config_file"
       
       local exit_code=$?
       
       if [ $exit_code -eq 0 ]; then
           echo "$(date '+%Y-%m-%d %H:%M:%S') - 任务 $task_id (日期: $random_date) 
执行成功"
       else
           echo "$(date '+%Y-%m-%d %H:%M:%S') - 任务 $task_id (日期: $random_date) 
执行失败,退出码: $exit_code"
       fi
       
       return $exit_code
   }
   
   # 主执行逻辑
   main() {
       echo "$(date '+%Y-%m-%d %H:%M:%S') - 开始并发执行任务,并发数: $CONCURRENCY"
       # 存储进程ID和日期的数组
       declare -a pids
       declare -a task_dates
       declare -a task_ids
       
       # 启动并发任务
       for ((i=1; i<=CONCURRENCY; i++)); do
           execute_task $i &
           pids[$i]=$!
           task_ids[$i]=$i
       done
       
       # 等待所有任务完成
       for ((i=1; i<=CONCURRENCY; i++)); do
           wait ${pids[$i]}
           exit_codes[$i]=$?
       done
       # 输出执行结果摘要
       echo ""
       echo "======= 任务执行结果摘要 ======="
       for ((i=1; i<=CONCURRENCY; i++)); do
           if [ ${exit_codes[$i]} -eq 0 ]; then
               echo "任务 ${task_ids[$i]}: 成功"
           else
               echo "任务 ${task_ids[$i]}: 失败 (退出码: ${exit_codes[$i]})"
           fi
       done
       
       echo "$(date '+%Y-%m-%d %H:%M:%S') - 所有任务执行完成"
   }
   # 带参数版本的函数
   main_with_args() {
       local custom_concurrency=$1
       
       if [ -n "$custom_concurrency" ] && [[ "$custom_concurrency" =~ ^[0-9]+$ 
]]; then
           CONCURRENCY="$custom_concurrency"
       fi
       
       main
   }
   # 清理临时文件函数
   cleanup() {
       echo "清理临时配置文件..."
   }
   # 设置退出时清理
   trap cleanup EXIT
   # 执行主函数
   if [ $# -ge 1 ]; then
       main_with_args "$1"
   else
       main
   fi
   ```
   
   ### Error Exception
   
   ```log
   2025-09-17 13:35:15,739 ERROR [o.a.s.c.s.SeaTunnel           ] [main] - 
Fatal Error, 
   
   2025-09-17 13:35:15,739 ERROR [o.a.s.c.s.SeaTunnel           ] [main] - 
Please submit bug report in https://github.com/apache/seatunnel/issues
   
   2025-09-17 13:35:15,739 ERROR [o.a.s.c.s.SeaTunnel           ] [main] - 
Reason:SeaTunnel job executed failed 
   
   2025-09-17 13:35:15,740 ERROR [o.a.s.c.s.SeaTunnel           ] [main] - 
Exception 
StackTrace:org.apache.seatunnel.core.starter.exception.CommandExecuteException: 
SeaTunnel job executed failed
        at 
org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:228)
        at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
        at 
org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:40)
   Caused by: 
org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: 
org.apache.seatunnel.engine.server.checkpoint.CheckpointException: 
CheckpointCoordinator inside have error.
        at 
org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.handleCoordinatorError(CheckpointCoordinator.java:282)
        at 
org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.handleCoordinatorError(CheckpointCoordinator.java:278)
        at 
org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.reportCheckpointErrorFromTask(CheckpointCoordinator.java:398)
        at 
org.apache.seatunnel.engine.server.checkpoint.CheckpointManager.reportCheckpointErrorFromTask(CheckpointManager.java:174)
        at 
org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointErrorReportOperation.runInternal(CheckpointErrorReportOperation.java:48)
        at 
org.apache.seatunnel.engine.server.task.operation.TracingOperation.run(TracingOperation.java:42)
        at 
com.hazelcast.spi.impl.operationservice.Operation.call(Operation.java:189)
        at 
com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.call(OperationRunnerImpl.java:273)
        at 
com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:248)
        at 
com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:213)
        at 
com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:175)
        at 
com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:139)
        at 
com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.executeRun(OperationThread.java:123)
        at 
com.hazelcast.internal.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:102)
   Caused by: org.apache.seatunnel.common.utils.SeaTunnelException: 
org.apache.seatunnel.engine.server.checkpoint.CheckpointException: Aggregate 
commit error.
        at 
org.apache.seatunnel.engine.server.task.SinkAggregatedCommitterTask.restoreState(SinkAggregatedCommitterTask.java:278)
        at 
org.apache.seatunnel.engine.server.checkpoint.operation.NotifyTaskRestoreOperation.lambda$null$0(NotifyTaskRestoreOperation.java:107)
        at 
java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1640)
        at org.apache.seatunnel.api.tracing.MDCRunnable.run(MDCRunnable.java:43)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
   
        ... 12 more
   
        at 
org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:220)
        ... 2 more
    
   2025-09-17 13:35:15,740 ERROR [o.a.s.c.s.SeaTunnel           ] [main] - 
   
===============================================================================
   
   
   
   Exception in thread "main" 
org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel 
job executed failed
        at 
org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:228)
        at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
        at 
org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:40)
   Caused by: 
org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: 
org.apache.seatunnel.engine.server.checkpoint.CheckpointException: 
CheckpointCoordinator inside have error.
        at 
org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.handleCoordinatorError(CheckpointCoordinator.java:282)
        at 
org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.handleCoordinatorError(CheckpointCoordinator.java:278)
        at 
org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.reportCheckpointErrorFromTask(CheckpointManager.java:398)
        at 
org.apache.seatunnel.engine.server.checkpoint.CheckpointManager.reportCheckpointErrorFromTask(CheckpointManager.java:174)
        at 
org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointErrorReportOperation.runInternal(CheckpointErrorReportOperation.java:48)
        at 
org.apache.seatunnel.engine.server.task.operation.TracingOperation.run(TracingOperation.java:42)
        at 
com.hazelcast.spi.impl.operationservice.Operation.call(Operation.java:189)
        at 
com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.call(OperationRunnerImpl.java:273)
        at 
com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:248)
        at 
com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:213)
        at 
com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:175)
        at 
com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:139)
        at 
com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.executeRun(OperationThread.java:123)
        at 
com.hazelcast.internal.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:102)
   Caused by: org.apache.seatunnel.common.utils.SeaTunnelException: 
org.apache.seatunnel.engine.server.checkpoint.CheckpointException: Aggregate 
commit error.
        at 
org.apache.seatunnel.engine.server.task.SinkAggregatedCommitterTask.restoreState(SinkAggregatedCommitterTask.java:278)
        at 
org.apache.seatunnel.engine.server.checkpoint.operation.NotifyTaskRestoreOperation.lambda$null$0(NotifyTaskRestoreOperation.java:107)
        at 
java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1640)
        at org.apache.seatunnel.api.tracing.MDCRunnable.run(MDCRunnable.java:43)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
   
        ... 12 more
   
        at 
org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:220)
        ... 2 more
   ```
   
   ### Zeta or Flink or Spark Version
   
   _No response_
   
   ### Java or Scala Version
   
   _No response_
   
   ### Screenshots
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://www.apache.org/foundation/policies/conduct)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to