Adamyuanyuan opened a new pull request, #9828: URL: https://github.com/apache/seatunnel/pull/9828
https://github.com/apache/seatunnel/issues/9826 <!-- Thank you for contributing to SeaTunnel! Please make sure that your code changes are covered with tests. And in case of new features or big changes remember to adjust the documentation. Feel free to ping committers for the review! ## Contribution Checklist - Make sure that the pull request corresponds to a [GITHUB issue](https://github.com/apache/seatunnel/issues). - Name the pull request in the form "[Feature] [component] Title of the pull request", where *Feature* can be replaced by `Hotfix`, `Bug`, etc. - Minor fixes should be named following this pattern: `[hotfix] [docs] Fix typo in README.md doc`. --> ### Purpose of this pull request <!-- Describe the purpose of this pull request. For example: This pull request adds checkstyle plugin.--> Fix flink Streaming file sink may lose data due to prepareCommit/write race and aggressive txn dir cleanup The underlying engine is Flink. When writing to Hive from Kafka in **Streaming mode**, the function is found to be unavailable. However, in **Batch mode**, data can be written to Hive normally. #### 1. Checkpoint Failure Issue When the data volume to be written is large, writing to HDFS fails. The specific performance under different data throughput is as follows: | Records per Second | Hive-Parquet | Hive-Text | | --- | --- | --- | | 5 records/sec | Normal | Normal | | 100 records/sec | Occasional failure (once every 8 minutes) | Normal | | 1000 records/sec | Fails (with a similar frequency to the 100 records/sec scenario) | Fails | <img width="2638" height="1496" alt="Image" src="https://github.com/user-attachments/assets/2a05365a-81db-4fbb-bbc4-84d408c021e0" /> <img width="2554" height="1118" alt="Image" src="https://github.com/user-attachments/assets/4f0ea1fa-1edf-4f47-8cc3-5a5a7554b6da" /> #### 2. Data Loss Issue + **File Sink/Hive Sink** may lose data, or fail to commit with the error "source missing" during the renaming process. + In addition, we have also observed logs of "already finished … skip" from idempotent paths. ### Affected Modules Under the **Flink Streaming** scenario: + All file write operations (including Hive write operations) + Connectors: `connector-file`, `connector-hive` ### Root cause - After prepareCommit, writers may still write into the same transaction directory before snapshotState. The committer renames listed files and deletes the transaction directory, which can accidentally remove files intended for the next commit. Additionally, rename lacks short visibility wait and strict idempotency checks. ### Proposal - Writer rotates to a new transaction immediately after prepareCommit; snapshotState skips double rotation via a flag. - Committer only deletes transaction directory when empty (ignoring hidden/system files); otherwise warn and skip. - HadoopFileSystemProxy.renameFile adds a short visibility wait and treats “target exists” as idempotent success; on real missing, prints diagnostics. ### Does this PR introduce _any_ user-facing change? <!-- Note that it means *any* user-facing change including all aspects such as the documentation fix. If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible. If possible, please also clarify if this is a user-facing change compared to the released SeaTunnel versions or within the unreleased branches such as dev. If no, write 'No'. If you are adding/modifying connector documents, please follow our new specifications: https://github.com/apache/seatunnel/issues/4544. --> no ### How was this patch tested? <!-- If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible. If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future. If tests were not added, please describe why they were not added and/or why it was difficult to add. If you are adding E2E test cases, maybe refer to https://github.com/apache/seatunnel/blob/dev/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql.conf, here is a good example. --> Large-scale testing in the test environment, no checkpoint failed, no data lose. <img width="3200" height="1662" alt="image" src="https://github.com/user-attachments/assets/6d52aa1a-427b-4609-924c-8aa47eee7cd8" /> <img width="2682" height="1486" alt="image" src="https://github.com/user-attachments/assets/1f6a8181-5501-4edc-b3cb-b47173a039f4" /> <img width="2632" height="1514" alt="image" src="https://github.com/user-attachments/assets/8587dea1-5794-4dd5-9903-94ce039d3f3e" /> ### Check list * [ ] If any new Jar binary package adding in your PR, please add License Notice according [New License Guide](https://github.com/apache/seatunnel/blob/dev/docs/en/contribution/new-license.md) * [ ] If necessary, please update the documentation to describe the new feature. https://github.com/apache/seatunnel/tree/dev/docs * [ ] If you are contributing the connector code, please check that the following files are updated: 1. Update [plugin-mapping.properties](https://github.com/apache/seatunnel/blob/dev/plugin-mapping.properties) and add new connector information in it 2. Update the pom file of [seatunnel-dist](https://github.com/apache/seatunnel/blob/dev/seatunnel-dist/pom.xml) 3. Add ci label in [label-scope-conf](https://github.com/apache/seatunnel/blob/dev/.github/workflows/labeler/label-scope-conf.yml) 4. Add e2e testcase in [seatunnel-e2e](https://github.com/apache/seatunnel/tree/dev/seatunnel-e2e/seatunnel-connector-v2-e2e/) 5. Update connector [plugin_config](https://github.com/apache/seatunnel/blob/dev/config/plugin_config) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
