Adamyuanyuan opened a new pull request, #10279:
URL: https://github.com/apache/seatunnel/pull/10279
<!--
Thank you for contributing to SeaTunnel! Please make sure that your code
changes
are covered with tests. And in case of new features or big changes
remember to adjust the documentation.
Feel free to ping committers for the review!
## Contribution Checklist
- Make sure that the pull request corresponds to a [GITHUB
issue](https://github.com/apache/seatunnel/issues).
- Name the pull request in the form "[Feature] [component] Title of the
pull request", where *Feature* can be replaced by `Hotfix`, `Bug`, etc.
- Minor fixes should be named following this pattern: `[hotfix] [docs] Fix
typo in README.md doc`.
-->
### Purpose of this pull request
When running SeaTunnel on Flink in **STREAMING** mode with Hive sink
`overwrite: true`, the final Hive partition/table directory may lose previously
committed files and end up containing only a subset of data (often only files
from the last checkpoint).
### Reproduction
- Use the pipeline such as:
- `env.job.mode = "STREAMING"`
- Hive sink: `overwrite: true`
- Constant partition (e.g. SQL transform adds `'2025-12-16' as pt`, so all
records go to the same partition)
- Observe job logs: every completed checkpoint triggers an aggregated
commit, and the same target partition directory is deleted repeatedly before
renaming/moving the new files.
- Result: the partition directory is cleared on every checkpoint, so only
the newest checkpoint’s files remain.
### Root Cause
`overwrite: true` is normalized to `DataSaveMode.DROP_DATA`. In
`HiveSinkAggregatedCommitter#commit(...)`, the implementation deleted the
target table/partition directories **before every commit**. In Flink streaming,
`commit()` is invoked after **every completed checkpoint**, so the delete step
was executed repeatedly and wiped files committed by earlier checkpoints.
### Fix
Implemented overwrite semantics that are safe for streaming checkpoints:
- Delete each target directory (table directory or partition directory) **at
most once per job attempt**, and only when the commit contains actual files
(skip deletion for empty commits).
- Best-effort recovery protection: parse `checkpointId` from
`transactionDir` (pattern like `.../T_xxx_<subtaskIndex>_<checkpointId>`). If
the first checkpoint id seen by this committer is `> 1` (usually indicates
recovery from a previous checkpoint), skip deletion to avoid removing already
committed data that matches the restored state.
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
yes,UT and tested in our test env.
### Check list
* [ ] If any new Jar binary package adding in your PR, please add License
Notice according
[New License
Guide](https://github.com/apache/seatunnel/blob/dev/docs/en/contribution/new-license.md)
* [ ] If necessary, please update the documentation to describe the new
feature. https://github.com/apache/seatunnel/tree/dev/docs
* [ ] If necessary, please update `incompatible-changes.md` to describe the
incompatibility caused by this PR.
* [ ] If you are contributing the connector code, please check that the
following files are updated:
1. Update
[plugin-mapping.properties](https://github.com/apache/seatunnel/blob/dev/plugin-mapping.properties)
and add new connector information in it
2. Update the pom file of
[seatunnel-dist](https://github.com/apache/seatunnel/blob/dev/seatunnel-dist/pom.xml)
3. Add ci label in
[label-scope-conf](https://github.com/apache/seatunnel/blob/dev/.github/workflows/labeler/label-scope-conf.yml)
4. Add e2e testcase in
[seatunnel-e2e](https://github.com/apache/seatunnel/tree/dev/seatunnel-e2e/seatunnel-connector-v2-e2e/)
5. Update connector
[plugin_config](https://github.com/apache/seatunnel/blob/dev/config/plugin_config)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]