Adamyuanyuan opened a new issue, #9826:
URL: https://github.com/apache/seatunnel/issues/9826

   ### Search before asking
   
   - [x] I had searched in the 
[issues](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22bug%22)
 and found no similar issues.
   
   
   ### What happened
   
   The underlying engine is Flink. When writing to Hive from Kafka in 
**Streaming mode**, the function is found to be unavailable. However, in 
**Batch mode**, data can be written to Hive normally.  
   
   
   
   ### 1. Checkpoint Failure Issue
   When the data volume to be written is large, writing to HDFS fails. The 
specific performance under different data throughput is as follows:  
   
   | Records per Second | Hive-Parquet | Hive-Text |
   | --- | --- | --- |
   | 5 records/sec | Normal | Normal |
   | 100 records/sec | Occasional failure (once every 8 minutes) | Normal |
   | 1000 records/sec | Fails (with a similar frequency to the 100 records/sec 
scenario) | Fails |
   
   <img width="2638" height="1496" alt="Image" 
src="https://github.com/user-attachments/assets/2a05365a-81db-4fbb-bbc4-84d408c021e0";
 />
   
   <img width="2554" height="1118" alt="Image" 
src="https://github.com/user-attachments/assets/4f0ea1fa-1edf-4f47-8cc3-5a5a7554b6da";
 />
   
   
   ### 2. Data Loss Issue
   + **File Sink/Hive Sink** may lose data, or fail to commit with the error 
"source missing" during the renaming process.  
   + In addition, we have also observed logs of "already finished … skip" from 
idempotent paths.
   
   
   
   I found that someone has already reported this issue, which may all be 
related to this bug. However, no one has resolved them yet and they have all 
been closed:  
   
   + https://github.com/apache/seatunnel/issues/8058
   + https://github.com/apache/seatunnel/issues/5952
   + https://github.com/apache/seatunnel/issues/8056
   + https://github.com/apache/seatunnel/issues/9469
   
   
   ### Affected Modules
   Under the **Flink Streaming** scenario:  
   + All file write operations (including Hive write operations)  
   + Connectors: `connector-file`, `connector-hive`
   
   
   ### Root cause
   - After prepareCommit, writers may still write into the same transaction 
directory before snapshotState. The committer renames listed files and deletes 
the transaction directory, which can accidentally remove files intended for the 
next commit. Additionally, rename lacks short visibility wait and strict 
idempotency checks.
   
   ### Proposal
   
   - Writer rotates to a new transaction immediately after prepareCommit; 
snapshotState skips double rotation via a flag.
   - Committer only deletes transaction directory when empty (ignoring 
hidden/system files); otherwise warn and skip.
   - HadoopFileSystemProxy.renameFile adds a short visibility wait and treats 
“target exists” as idempotent success; on real missing, prints diagnostics.
   
   
   ### SeaTunnel Version
   
   2.3.11,2.3.9, and all
   
   ### SeaTunnel Config
   
   ```conf
   env {
     #运行并行度
     execution.parallelism = 1
     #流/批次作业
     job.mode = "STREAMING"
     #ck时间间隔
     checkpoint.interval = 60000
   }
   
   source {
     Kafka {
       schema = {
         fields {
           id = bigint
           page_no = bigint
           ...
         }
       }
       format = text
       field_delimiter = "\\|"
       topic = "push_report_event"
       bootstrap.servers = "..."
       consumer.group = "push_report_event"
       kafka.config = {
         max.poll.records = 100
         auto.offset.reset = "earliest"
         enable.auto.commit = "false"
       }
         result_table_name = "result_table"
     }  
   }
   
   transform {
     Sql {
       #来源结果集表名
       source_table_name = "result_table"
       #目标结果集表名
       result_table_name = "source_table"
       query = "select id, page_no..."
      }
   }
   
   sink {
     Hive {
         table_name = "gang_test.kafka_source_text"
         metastore_uri = "thrift://...:9083"
         hdfs_site_path = "datasource-conf/.../hdfs-site.xml"
         hive_site_path = "datasource-conf/.../hive-site.xml"
         krb5_path = "datasource-conf/.../krb5.conf"
         kerberos_principal = "hive/bigdata-..@MR......"
         kerberos_keytab_path = "datasource-conf/.../hive.keytab"
         overwrite = false
         # compress_codec = "SNAPPY"
         tmp_path = 
"hdfs://.../.../hive/warehouse/_seatunnel_tmp_kafka_source_text"
         batch_size = 5000
         batch_interval_ms = 15000
         source_table_name = "source_table"
         file_name_expression = "${now}"
     }
   }
   ```
   
   ### Running Command
   
   ```shell
   not need
   ```
   
   ### Error Exception
   
   ```log
   java.lang.Exception: Could not perform checkpoint 1305 for operator Source: 
Kafka-Source -> Flat Map -> MultiTableSink-Sink: Writer -> MultiTableSink-Sink: 
Committer (1/1)#0.
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointAsyncInMailbox(StreamTask.java:1166)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$12(StreamTask.java:1113)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:367)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:352)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
        at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
        at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
        at java.lang.Thread.run(Thread.java:750)
   Caused by: java.lang.RuntimeException: 
java.util.concurrent.ExecutionException: 
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException:
 ErrorCode:[COMMON-10], ErrorDescription:[Flush data operation that in sink 
connector failed] - Flush data to this file 
[hdfs://ctyunns/user/hive/warehouse/_seatunnel_tmp_kafka_source_text/seatunnel/3a5c6f4714884746bc2f5c59d660abd9/f9351a0e5d/T_3a5c6f4714884746bc2f5c59d660abd9_f9351a0e5d_0_1/NON_PARTITION/T_3a5c6f4714884746bc2f5c59d660abd9_f9351a0e5d_0_1_148.txt]
 failed
        at 
org.apache.seatunnel.api.sink.multitablesink.MultiTableSinkWriter.prepareCommit(MultiTableSinkWriter.java:276)
        at 
org.apache.seatunnel.translation.flink.sink.FlinkSinkWriter.prepareCommit(FlinkSinkWriter.java:104)
        at 
org.apache.flink.streaming.api.transformations.SinkV1Adapter$SinkWriterV1Adapter.prepareCommit(SinkV1Adapter.java:151)
        at 
org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.emitCommittables(SinkWriterOperator.java:198)
        at 
org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.prepareSnapshotPreBarrier(SinkWriterOperator.java:168)
        at 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.prepareSnapshotPreBarrier(RegularOperatorChain.java:89)
        at 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:334)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$13(StreamTask.java:1281)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1269)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointAsyncInMailbox(StreamTask.java:1154)
        ... 14 more
   ```
   
   ### Zeta or Flink or Spark Version
   
   Flink 1.16 
   
   ### Java or Scala Version
   
   _No response_
   
   ### Screenshots
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [x] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://www.apache.org/foundation/policies/conduct)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to