inkkim opened a new issue, #6229:
URL: https://github.com/apache/seatunnel/issues/6229

   ### Search before asking
   
   - [X] I had searched in the 
[issues](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22bug%22)
 and found no similar issues.
   
   
   ### What happened
   
   When I try to do JDBC source(Oracle, PostgreSQL) to SFTP sink(parquet type), 
I got this error.
   
   While data source and sink are in progress, we try to save checkpoints at 
regular intervals, but the pending and wait states are repeated. It appears 
that an error occurred during the final aggregation process. This happens 
equally in both cluster mode and local mode.
   
   ```bash
   
root@ubuntu-2004:/tmp/seatunnel/seatunnel/799451662937751553/09396c8f64/T_799451662937751553_09396c8f64_0_1/NON_PARTITION#
 ls -alh
   total 1.8G
   drwxrwxr-x 2 ink ink 4.0K Jan 16 10:38 .
   drwxrwxr-x 3 ink ink 4.0K Jan 16 10:35 ..
   -rw-rw-r-- 1 ink ink 184M Jan 16 10:39 
T_799451662937751553_09396c8f64_0_1_0.parquet
   -rw-rw-r-- 1 ink ink 184M Jan 16 10:39 
T_799451662937751553_09396c8f64_0_1_1.parquet
   -rw-rw-r-- 1 ink ink 184M Jan 16 10:39 
T_799451662937751553_09396c8f64_0_1_2.parquet
   -rw-rw-r-- 1 ink ink 184M Jan 16 10:39 
T_799451662937751553_09396c8f64_0_1_3.parquet
   -rw-rw-r-- 1 ink ink 184M Jan 16 10:39 
T_799451662937751553_09396c8f64_0_1_4.parquet
   -rw-rw-r-- 1 ink ink 184M Jan 16 10:39 
T_799451662937751553_09396c8f64_0_1_5.parquet
   -rw-rw-r-- 1 ink ink 184M Jan 16 10:39 
T_799451662937751553_09396c8f64_0_1_6.parquet
   -rw-rw-r-- 1 ink ink 184M Jan 16 10:40 
T_799451662937751553_09396c8f64_0_1_7.parquet
   -rw-rw-r-- 1 ink ink 184M Jan 16 10:40 
T_799451662937751553_09396c8f64_0_1_8.parquet
   -rw-rw-r-- 1 ink ink 184M Jan 16 10:40 
T_799451662937751553_09396c8f64_0_1_9.parquet
   ```
   
   
   ### SeaTunnel Version
   
   SeaTunnel 2.3.3
   
   ### SeaTunnel Config
   
   ```conf
   # /usr/local/seatunnel/config/seatunnel.yaml
   seatunnel:
     engine:
       history-job-expire-minutes: 1440
       backup-count: 1
       queue-type: blockingqueue
       print-execution-info-interval: 60
       print-job-metrics-info-interval: 60
       slot-service:
         dynamic-slot: true
       checkpoint:
         interval: 10000
         timeout: 600000
         storage:
           type: hdfs
           max-retained: 3
           plugin-config:
             namespace: /dev/
             storage.type: hdfs
             fs.defaultFS: hdfs://namenode:8020 # Ensure that the directory has 
written permission
   
   
   # /usr/local/seatunnel/config/v2.batch.config.jdbc-oracle-sftp-parquet
   
   env {
     # You can set SeaTunnel environment configuration here
     execution.parallelism = 3
     job.mode = "BATCH"
     checkpoint.interval = 10000
     #execution.checkpoint.interval = 10000
     #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
   }
   
   source {
     # This is a example source plugin **only for test and demonstrate the 
feature source plugin**
     Jdbc {
       url = "jdbc:oracle:thin:@oracle-server:19070:XE"
       driver = "oracle.jdbc.OracleDriver"
       user = "user"
       password = "StrongP@ssword"
       query = "select * from USER.TEST_ITEM"
       result_table_name = "TEST_ITEM"
       }
     }
   
   sink {
       SftpFile {
       path = "/tmp/sink/oracle2parquet"
       host = "sftp-host"
       port = 22
       user = ink
       password = "StrongP@ssword"
       file_format_type = "parquet"
       source_table_name = "TEST_ITEM"
     }
   ```
   
   
   ### Running Command
   
   ```shell
   /usr/local/seatunnel/bin/seatunnel.sh --config 
/usr/local/seatunnel/config/v2.batch.config.jdbc-oracle-sftp-parquet
   ```
   
   
   ### Error Exception
   
   ```log
   Exception in thread "main" 
org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel 
job executed failed
        at 
org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:191)
        at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
        at 
org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:34)
   Caused by: 
org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: 
org.apache.seatunnel.engine.server.checkpoint.CheckpointException: Checkpoint 
notify complete failed
        at 
org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.handleCoordinatorError(CheckpointCoordinator.java:255)
        at 
org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.handleCoordinatorError(CheckpointCoordinator.java:251)
        at 
org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.notifyCompleted(CheckpointCoordinator.java:328)
        at 
org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:736)
        at 
org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.lambda$null$7(CheckpointCoordinator.java:480)
        at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
        at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
        at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
   Caused by: java.util.concurrent.CompletionException: 
org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: 
org.apache.seatunnel.engine.server.checkpoint.CheckpointException: Aggregate 
commit error.
        at 
org.apache.seatunnel.engine.server.task.SinkAggregatedCommitterTask.notifyCheckpointComplete(SinkAggregatedCommitterTask.java:300)
        at 
org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointFinishedOperation.lambda$run$0(CheckpointFinishedOperation.java:91)
        at 
org.apache.seatunnel.common.utils.RetryUtils.retryWithException(RetryUtils.java:48)
        at 
org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointFinishedOperation.run(CheckpointFinishedOperation.java:81)
        at 
com.hazelcast.spi.impl.operationservice.Operation.call(Operation.java:189)
        at 
com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.call(OperationRunnerImpl.java:273)
        at 
com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:248)
        at 
com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:213)
        at 
com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:175)
        at 
com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:139)
        at 
com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.executeRun(OperationThread.java:123)
        at 
com.hazelcast.internal.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:102)
   
        at 
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
        at 
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
        at 
java.util.concurrent.CompletableFuture.biRelay(CompletableFuture.java:1300)
        at 
java.util.concurrent.CompletableFuture$BiRelay.tryFire(CompletableFuture.java:1284)
        at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
        at 
com.hazelcast.spi.impl.AbstractInvocationFuture.onComplete(AbstractInvocationFuture.java:1243)
        at 
com.hazelcast.spi.impl.AbstractInvocationFuture.complete0(AbstractInvocationFuture.java:1234)
        at 
com.hazelcast.spi.impl.AbstractInvocationFuture.completeExceptionallyInternal(AbstractInvocationFuture.java:1223)
        at 
com.hazelcast.spi.impl.operationservice.impl.Invocation.completeExceptionally(Invocation.java:680)
        at 
com.hazelcast.spi.impl.operationservice.impl.Invocation.notifyThrowable(Invocation.java:386)
        at 
com.hazelcast.spi.impl.operationservice.impl.Invocation.notifyError(Invocation.java:330)
        at 
com.hazelcast.spi.impl.operationservice.impl.Invocation.sendResponse(Invocation.java:230)
        at 
com.hazelcast.spi.impl.operationservice.Operation.sendResponse(Operation.java:483)
   ```
   
   
   ### Zeta or Flink or Spark Version
   
   Zeta Cluster Mode (2 instances)
   
   ### Java or Scala Version
   
   OpenJDK Runtime Environment (build 1.8.0_392-8u392-ga-1~20.04-b08)
   
   ### Screenshots
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://www.apache.org/foundation/policies/conduct)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to