CloverDew opened a new pull request, #10502: URL: https://github.com/apache/seatunnel/pull/10502
### Purpose of this pull request Implement dirty data collection framework to record and handle dirty records across SeaTunnel sinks, enabling configurable collectors, validators, SPI extensions, and threshold-based failure policies. **Design from:** https://github.com/apache/seatunnel/issues/4587 ### Summary of changes - Add `DirtyRecordCollector` interface with overloads, init, validation integration, and threshold support. - Implement `LogDirtyRecordCollector`, `NoOpDirtyRecordCollector`, and example SPI collector `CountingDirtyRecordCollector` with providers. - Add `DirtyDataValidator` SPI and `ValidatingDirtyRecordCollector` wrapper to support user-defined validation rules. - Add `DirtyRecordCollectorFactory` and `DirtyCollectorConfigProcessor` to merge env and sink configs, instantiate collectors/validators and inject into sink writer context. - Integrate collector initialization and propagation into engine/translation layers so writer contexts receive a collector instance. - Add unit and e2e tests and example configs for built-in and custom collectors/validators. ### Architecture - Config-driven: collector and validator configured under `env.dirty.collector` / `env.dirty.validator` or per-sink `dirty.collector` / `dirty.validator`. `DirtyCollectorConfigProcessor` merges env and sink configs and prefers sink-specific config. - SPI extensibility: collectors and validators discovered via Factory/SPI. Built-in "log" collector provided; custom types (e.g., "counting") can be registered via ServiceLoader. - Validation pipeline: when a `dirty.validator` is configured, a `ValidatingDirtyRecordCollector` wraps the collector and calls validator.validate(record, catalogTable); records flagged dirty are collected by the delegate. - Runtime behavior: sinks call context.getDirtyRecordCollector().collect(...) on write failures; writers may call validateAndCollectIfDirty(...) before writing. Collectors track counts and enforce thresholds. - Serialization/Injection: collector objects are instantiated and injected into sink writer **Configuration Example:** ``` env { parallelism = 1 job.mode = "BATCH" dirty.collector = { type = "log" log_level = "ERROR" threshold = 10 fail_on_threshold = true } dirty.validator = { type = "AlwaysDirtyDataValidator" } } source { FakeSource { row.num = 100 schema = { fields { id = "bigint" name = "string" age = "int" status = "string" } } plugin_output = "fake" } } sink { Console { plugin_input = "fake" } } ``` ### Does this PR introduce _any_ user-facing change? Yes. Previously, there was no unified dirty data collection capability; this PR introduces a configurable dirty data collection framework, allowing users to record and process dirty data via configuration or SPI-enabled collectors/validators. **Major Changes:** Added configuration items `env.dirty.collector` and `env.dirty.validator`(Currently, only the relevant configurations have been added for ConsoleSink; the others sink need to be implemented manually.); provided built-in log and example counting collectors; supports custom collectors/validators via SPI. **Compatibility and Default Behavior:** `NoOpDirtyRecordCollector` is used when not configured. Enabling a threshold allows configuration to determine whether tasks fail due to exceeding the threshold. ### How was this patch tested? Unit tests for config merging, factory discovery and basic collector behaviors. E2E tests `DirtyDataCollectionIT`, `DirtyDataCustomExtensionIT` for: - [ ] Built-in log collector. - [ ] Custom SPI collector/validator usage. - [ ] Threshold and fail-on-threshold behavior. ### Check list * [ ] If any new Jar binary package adding in your PR, please add License Notice according [New License Guide](https://github.com/apache/seatunnel/blob/dev/docs/en/contribution/new-license.md) * [ ] If necessary, please update the documentation to describe the new feature. https://github.com/apache/seatunnel/tree/dev/docs * [ ] If necessary, please update `incompatible-changes.md` to describe the incompatibility caused by this PR. * [ ] If you are contributing the connector code, please check that the following files are updated: 1. Update [plugin-mapping.properties](https://github.com/apache/seatunnel/blob/dev/plugin-mapping.properties) and add new connector information in it 2. Update the pom file of [seatunnel-dist](https://github.com/apache/seatunnel/blob/dev/seatunnel-dist/pom.xml) 3. Add ci label in [label-scope-conf](https://github.com/apache/seatunnel/blob/dev/.github/workflows/labeler/label-scope-conf.yml) 4. Add e2e testcase in [seatunnel-e2e](https://github.com/apache/seatunnel/tree/dev/seatunnel-e2e/seatunnel-connector-v2-e2e/) 5. Update connector [plugin_config](https://github.com/apache/seatunnel/blob/dev/config/plugin_config) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
