CloverDew opened a new pull request, #10502:
URL: https://github.com/apache/seatunnel/pull/10502

   ### Purpose of this pull request
   Implement dirty data collection framework to record and handle dirty records 
across SeaTunnel sinks, enabling configurable collectors, validators, SPI 
extensions, and threshold-based failure policies.
   **Design from:** https://github.com/apache/seatunnel/issues/4587
   
   ### Summary of changes
   
   - Add `DirtyRecordCollector` interface with overloads, init, validation 
integration, and threshold support.
   
   - Implement `LogDirtyRecordCollector`, `NoOpDirtyRecordCollector`, and 
example SPI collector `CountingDirtyRecordCollector` with providers.
   
   - Add `DirtyDataValidator` SPI and `ValidatingDirtyRecordCollector` wrapper 
to support user-defined validation rules.
   
   - Add `DirtyRecordCollectorFactory` and `DirtyCollectorConfigProcessor` to 
merge env and sink configs, instantiate collectors/validators and inject into 
sink writer context.
   
   - Integrate collector initialization and propagation into engine/translation 
layers so writer contexts receive a collector instance.
   
   - Add unit and e2e tests and example configs for built-in and custom 
collectors/validators.
   
   ### Architecture
   
   - Config-driven: collector and validator configured under 
`env.dirty.collector` / `env.dirty.validator` or per-sink `dirty.collector` / 
`dirty.validator`. `DirtyCollectorConfigProcessor` merges env and sink configs 
and prefers sink-specific config.
   
   - SPI extensibility: collectors and validators discovered via Factory/SPI. 
Built-in "log" collector provided; custom types (e.g., "counting") can be 
registered via ServiceLoader.
   
   - Validation pipeline: when a `dirty.validator` is configured, a 
`ValidatingDirtyRecordCollector` wraps the collector and calls 
validator.validate(record, catalogTable); records flagged dirty are collected 
by the delegate.
   
   - Runtime behavior: sinks call 
context.getDirtyRecordCollector().collect(...) on write failures; writers may 
call validateAndCollectIfDirty(...) before writing. Collectors track counts and 
enforce thresholds.
   
   - Serialization/Injection: collector objects are instantiated and injected 
into sink writer
   
   **Configuration Example:**
   ```
   env {
     parallelism = 1
     job.mode = "BATCH"
   
     dirty.collector = {
       type = "log"
       log_level = "ERROR"
       threshold = 10
       fail_on_threshold = true
     }
     
     dirty.validator = {
       type = "AlwaysDirtyDataValidator"
     }
   }
   
   source {
     FakeSource {
       row.num = 100
       schema = {
         fields {
           id = "bigint"
           name = "string"
           age = "int"
           status = "string"
         }
       }
       plugin_output = "fake"
     }
   }
   
   sink {
     Console {
       plugin_input = "fake"
     }
   }
   ```
   
   ### Does this PR introduce _any_ user-facing change?
    
   Yes. Previously, there was no unified dirty data collection capability; this 
PR introduces a configurable dirty data collection framework, allowing users to 
record and process dirty data via configuration or SPI-enabled 
collectors/validators.
   
   **Major Changes:** Added configuration items `env.dirty.collector`  and 
`env.dirty.validator`(Currently, only the relevant configurations have been 
added for ConsoleSink; the others sink need to be implemented manually.); 
provided built-in log and example counting collectors; supports custom 
collectors/validators via SPI.
   
   **Compatibility and Default Behavior:** `NoOpDirtyRecordCollector` is used 
when not configured. Enabling a threshold allows configuration to determine 
whether tasks fail due to exceeding the threshold.
   
   
   ### How was this patch tested?
   
   Unit tests for config merging, factory discovery and basic collector 
behaviors.
   E2E tests `DirtyDataCollectionIT`, `DirtyDataCustomExtensionIT` for:
   
   - [ ] Built-in log collector.
   - [ ] Custom SPI collector/validator usage.
   - [ ] Threshold and fail-on-threshold behavior.
   
   
   ### Check list
   
   * [ ] If any new Jar binary package adding in your PR, please add License 
Notice according
     [New License 
Guide](https://github.com/apache/seatunnel/blob/dev/docs/en/contribution/new-license.md)
   * [ ] If necessary, please update the documentation to describe the new 
feature. https://github.com/apache/seatunnel/tree/dev/docs
   * [ ] If necessary, please update `incompatible-changes.md` to describe the 
incompatibility caused by this PR.
   * [ ] If you are contributing the connector code, please check that the 
following files are updated:
     1. Update 
[plugin-mapping.properties](https://github.com/apache/seatunnel/blob/dev/plugin-mapping.properties)
 and add new connector information in it
     2. Update the pom file of 
[seatunnel-dist](https://github.com/apache/seatunnel/blob/dev/seatunnel-dist/pom.xml)
     3. Add ci label in 
[label-scope-conf](https://github.com/apache/seatunnel/blob/dev/.github/workflows/labeler/label-scope-conf.yml)
     4. Add e2e testcase in 
[seatunnel-e2e](https://github.com/apache/seatunnel/tree/dev/seatunnel-e2e/seatunnel-connector-v2-e2e/)
     5. Update connector 
[plugin_config](https://github.com/apache/seatunnel/blob/dev/config/plugin_config)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to