Adamyuanyuan opened a new issue, #9826: URL: https://github.com/apache/seatunnel/issues/9826
### Search before asking - [x] I had searched in the [issues](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22bug%22) and found no similar issues. ### What happened The underlying engine is Flink. When writing to Hive from Kafka in **Streaming mode**, the function is found to be unavailable. However, in **Batch mode**, data can be written to Hive normally. ### 1. Checkpoint Failure Issue When the data volume to be written is large, writing to HDFS fails. The specific performance under different data throughput is as follows: | Records per Second | Hive-Parquet | Hive-Text | | --- | --- | --- | | 5 records/sec | Normal | Normal | | 100 records/sec | Occasional failure (once every 8 minutes) | Normal | | 1000 records/sec | Fails (with a similar frequency to the 100 records/sec scenario) | Fails | <img width="2638" height="1496" alt="Image" src="https://github.com/user-attachments/assets/2a05365a-81db-4fbb-bbc4-84d408c021e0" /> <img width="2554" height="1118" alt="Image" src="https://github.com/user-attachments/assets/4f0ea1fa-1edf-4f47-8cc3-5a5a7554b6da" /> ### 2. Data Loss Issue + **File Sink/Hive Sink** may lose data, or fail to commit with the error "source missing" during the renaming process. + In addition, we have also observed logs of "already finished … skip" from idempotent paths. I found that someone has already reported this issue, which may all be related to this bug. However, no one has resolved them yet and they have all been closed: + https://github.com/apache/seatunnel/issues/8058 + https://github.com/apache/seatunnel/issues/5952 + https://github.com/apache/seatunnel/issues/8056 + https://github.com/apache/seatunnel/issues/9469 ### Affected Modules Under the **Flink Streaming** scenario: + All file write operations (including Hive write operations) + Connectors: `connector-file`, `connector-hive` ### Root cause - After prepareCommit, writers may still write into the same transaction directory before snapshotState. The committer renames listed files and deletes the transaction directory, which can accidentally remove files intended for the next commit. Additionally, rename lacks short visibility wait and strict idempotency checks. ### Proposal - Writer rotates to a new transaction immediately after prepareCommit; snapshotState skips double rotation via a flag. - Committer only deletes transaction directory when empty (ignoring hidden/system files); otherwise warn and skip. - HadoopFileSystemProxy.renameFile adds a short visibility wait and treats “target exists” as idempotent success; on real missing, prints diagnostics. ### SeaTunnel Version 2.3.11,2.3.9, and all ### SeaTunnel Config ```conf env { #运行并行度 execution.parallelism = 1 #流/批次作业 job.mode = "STREAMING" #ck时间间隔 checkpoint.interval = 60000 } source { Kafka { schema = { fields { id = bigint page_no = bigint ... } } format = text field_delimiter = "\\|" topic = "push_report_event" bootstrap.servers = "..." consumer.group = "push_report_event" kafka.config = { max.poll.records = 100 auto.offset.reset = "earliest" enable.auto.commit = "false" } result_table_name = "result_table" } } transform { Sql { #来源结果集表名 source_table_name = "result_table" #目标结果集表名 result_table_name = "source_table" query = "select id, page_no..." } } sink { Hive { table_name = "gang_test.kafka_source_text" metastore_uri = "thrift://...:9083" hdfs_site_path = "datasource-conf/.../hdfs-site.xml" hive_site_path = "datasource-conf/.../hive-site.xml" krb5_path = "datasource-conf/.../krb5.conf" kerberos_principal = "hive/bigdata-..@MR......" kerberos_keytab_path = "datasource-conf/.../hive.keytab" overwrite = false # compress_codec = "SNAPPY" tmp_path = "hdfs://.../.../hive/warehouse/_seatunnel_tmp_kafka_source_text" batch_size = 5000 batch_interval_ms = 15000 source_table_name = "source_table" file_name_expression = "${now}" } } ``` ### Running Command ```shell not need ``` ### Error Exception ```log java.lang.Exception: Could not perform checkpoint 1305 for operator Source: Kafka-Source -> Flat Map -> MultiTableSink-Sink: Writer -> MultiTableSink-Sink: Committer (1/1)#0. at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointAsyncInMailbox(StreamTask.java:1166) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$12(StreamTask.java:1113) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:367) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:352) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) at java.lang.Thread.run(Thread.java:750) Caused by: java.lang.RuntimeException: java.util.concurrent.ExecutionException: org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException: ErrorCode:[COMMON-10], ErrorDescription:[Flush data operation that in sink connector failed] - Flush data to this file [hdfs://ctyunns/user/hive/warehouse/_seatunnel_tmp_kafka_source_text/seatunnel/3a5c6f4714884746bc2f5c59d660abd9/f9351a0e5d/T_3a5c6f4714884746bc2f5c59d660abd9_f9351a0e5d_0_1/NON_PARTITION/T_3a5c6f4714884746bc2f5c59d660abd9_f9351a0e5d_0_1_148.txt] failed at org.apache.seatunnel.api.sink.multitablesink.MultiTableSinkWriter.prepareCommit(MultiTableSinkWriter.java:276) at org.apache.seatunnel.translation.flink.sink.FlinkSinkWriter.prepareCommit(FlinkSinkWriter.java:104) at org.apache.flink.streaming.api.transformations.SinkV1Adapter$SinkWriterV1Adapter.prepareCommit(SinkV1Adapter.java:151) at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.emitCommittables(SinkWriterOperator.java:198) at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.prepareSnapshotPreBarrier(SinkWriterOperator.java:168) at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.prepareSnapshotPreBarrier(RegularOperatorChain.java:89) at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:334) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$13(StreamTask.java:1281) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1269) at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointAsyncInMailbox(StreamTask.java:1154) ... 14 more ``` ### Zeta or Flink or Spark Version Flink 1.16 ### Java or Scala Version _No response_ ### Screenshots _No response_ ### Are you willing to submit PR? - [x] Yes I am willing to submit a PR! ### Code of Conduct - [x] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
