zhangshenghang opened a new issue, #10196: URL: https://github.com/apache/seatunnel/issues/10196
### Search before asking - [x] I had searched in the [feature](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22Feature%22) and found no similar feature requirement. ### Description ## Description Currently, when SeaTunnel encounters exceptions during Transform/Sink phases (especially data quality issues like field type mismatch, primary key conflicts), the common behavior is to throw an exception and fail the entire job. **Proposal**: Introduce a Dead Letter Queue (DLQ) capability that allows: - Normal data continues flowing in the main pipeline - Error data (row-level errors) is bypassed to a separate error sink for troubleshooting and reprocessing ## Use Cases 1. **Transform Phase**: JSON parsing failure, field type conversion failure, custom validation failure 2. **Sink Phase**: Primary key conflicts, field length exceeded, NOT NULL constraint violations ## Core Requirements ### 1. Row-Level vs System-Level Error Distinction | Error Type | Examples | Behavior | |------------|----------|----------| | Row-Level | Constraint violation, type mismatch, JSON parse error | Bypass to error sink (configurable) | | System-Level | DB connection failure, network issues, OOM | Fail-fast (unchanged) | ### 2. Three-Level Configuration Hierarchy ``` Plugin-level > Stage-level > Global ``` ```hocon env { error_handler { mode = "ROUTE" # DISABLE | LOG | ROUTE sink { plugin_name = "Jdbc" error_table = "st_error_data" } max_error_ratio = 0.01 # Fail job if exceeded max_error_records = 100000 } } ``` ### 3. Unified Error Table Schema | Field | Type | Description | |-------|------|-------------| | error_stage | STRING | TRANSFORM / SINK | | plugin_name | STRING | Plugin that failed | | error_type | STRING | Error classification | | error_message | STRING | Error details | | original_data | STRING | Original row data (JSON) | | occur_time | TIMESTAMP | Error timestamp | | job_id | STRING | Job identifier | | checkpoint_id | BIGINT | For batch tracking | ### 4. Performance Protection - Async batch write for error sink (non-blocking) - Bounded queue with overflow policy (FAIL/DROP/BLOCK) - Error threshold control (ratio & count) ## Transform Error Propagation When multiple transforms exist (A → B → C), if data fails at Transform A: - Data goes directly to error handler - Data does NOT continue to B, C - Other normal data flows unaffected ## Error Sink Failure Policy If error sink itself fails → **Fail job immediately** (no fallback, keep semantics simple) <img width="4936" height="16111" alt="Image" src="https://github.com/user-attachments/assets/cfcafdba-cb73-4386-aa5d-d4f11a85f16d" /> ## 1) Main Pipeline 1. Data enters from the **Source** and sequentially passes through **Transform A → Transform B → Transform C → Sink**. 2. After each Transform is executed, a check is performed: **success?** * **Yes (Success):** Data proceeds to the next operator (A→B→C). * **No (Failure):** If the data enters the operator with a **Classify error**, it will not proceed to subsequent Transforms (this satisfies your requirement that if A fails, it will not proceed to B/C, which is a "short circuit"). 3. Sink writes also check **Sink success?** * **Yes**: Continue normally * **No**: Enter **Classify error Sink** for classification ## 2) Error Classification: Row-level vs. System-level For exceptions thrown by Transform or Sink, first determine if it is a **Row-level error**: * **System-level error** For example: DB connection failure, network failure, OOM, cluster unavailability, etc. → Directly proceed to **Fail-fast**: Task fails and exits (preserving existing semantics) * **Row-level error** For example: Type mismatch, JSON parsing failure, field length exceeding limit, NOT NULL violation, primary key conflict, etc. → Enter **Error Handler / DLQ** for bypass processing (other normal data in the main chain continues) ## 3) DLQ Processing (Error Handler / DLQ) After entering DLQ, follow the configured **Mode** Decide how to handle row-level errors: * **DISABLE**: Discard the erroneous data directly (main process continues) * **LOG**: Log the error (main process continues) * **ROUTE**: Build a unified error record and write it to the error sink (the actual DLQ) In **ROUTE** mode: 1. **Build unified error record** This will wrap the bad data into a unified structure, including: `error_stage / plugin_name / error_type / error_message / original_data / occur_time / job_id / checkpoint_id` 2. **Update counters** Statistics on the number of errors and the error ratio (error ratio & error count). 3. **Threshold exceeded?** * If `max_error_ratio` or `max_error_records` is exceeded, the process **Fail-fast** will fail immediately (to prevent the silent swallowing of too much dirty data). * If not exceeded, the process will proceed to the asynchronous write path. 4. **Enqueue to bounded async queue** For performance protection, the main chain does not directly write to the error table synchronously; instead, it first enqueues the error. 5. **Queue Overflow Policy** * **FAIL**: Fail when the queue is full (most stringent) * **DROP**: Discard error records (main process continues) * **BLOCK**: Block while waiting for space in the queue (creates back pressure on the main process) 6. **Async Batch Writer → Error Sink** * Writes error records to your configured DLQ Sink (e.g., JDBC table) in the background in batches. 7. **Error Sink OK?** * **Yes**: Ack complete * **No**: **Fail-fast (Error sink failure)** As per your requirements: If the error sink fails, the task immediately fails; the semantics are simple and clear. ## 4) Configuration Activation Order (3-Level Config Resolution) The bottom right corner of the diagram illustrates the configuration resolution priority: **Plugin-level > Stage-level > Global** The resolved **Resolved Config** is "injected" into the Error Handler, ensuring that within the same job: * Globally, you can uniformly enable/disable/threshold/error sinks * A specific stage or plugin can override this (for finer-grained control) ### Usage Scenario _No response_ ### Related issues _No response_ ### Are you willing to submit a PR? - [x] Yes I am willing to submit a PR! ### Code of Conduct - [x] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
