zhangshenghang opened a new issue, #10196:
URL: https://github.com/apache/seatunnel/issues/10196

   ### Search before asking
   
   - [x] I had searched in the 
[feature](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22Feature%22)
 and found no similar feature requirement.
   
   
   ### Description
   
   
   ## Description
   
   Currently, when SeaTunnel encounters exceptions during Transform/Sink phases 
(especially data quality issues like field type mismatch, primary key 
conflicts), the common behavior is to throw an exception and fail the entire 
job.
   
   **Proposal**: Introduce a Dead Letter Queue (DLQ) capability that allows:
   - Normal data continues flowing in the main pipeline
   - Error data (row-level errors) is bypassed to a separate error sink for 
troubleshooting and reprocessing
   
   ## Use Cases
   
   1. **Transform Phase**: JSON parsing failure, field type conversion failure, 
custom validation failure
   2. **Sink Phase**: Primary key conflicts, field length exceeded, NOT NULL 
constraint violations
   
   ## Core Requirements
   
   ### 1. Row-Level vs System-Level Error Distinction
   
   | Error Type | Examples | Behavior |
   |------------|----------|----------|
   | Row-Level | Constraint violation, type mismatch, JSON parse error | Bypass 
to error sink (configurable) |
   | System-Level | DB connection failure, network issues, OOM | Fail-fast 
(unchanged) |
   
   ### 2. Three-Level Configuration Hierarchy
   
   ```
   Plugin-level > Stage-level > Global
   ```
   
   ```hocon
   env {
     error_handler {
       mode = "ROUTE"              # DISABLE | LOG | ROUTE
       sink {
         plugin_name = "Jdbc"
         error_table = "st_error_data"
       }
       max_error_ratio = 0.01      # Fail job if exceeded
       max_error_records = 100000
     }
   }
   ```
   
   ### 3. Unified Error Table Schema
   
   | Field | Type | Description |
   |-------|------|-------------|
   | error_stage | STRING | TRANSFORM / SINK |
   | plugin_name | STRING | Plugin that failed |
   | error_type | STRING | Error classification |
   | error_message | STRING | Error details |
   | original_data | STRING | Original row data (JSON) |
   | occur_time | TIMESTAMP | Error timestamp |
   | job_id | STRING | Job identifier |
   | checkpoint_id | BIGINT | For batch tracking |
   
   ### 4. Performance Protection
   
   - Async batch write for error sink (non-blocking)
   - Bounded queue with overflow policy (FAIL/DROP/BLOCK)
   - Error threshold control (ratio & count)
   
   ## Transform Error Propagation
   
   When multiple transforms exist (A → B → C), if data fails at Transform A:
   - Data goes directly to error handler
   - Data does NOT continue to B, C
   - Other normal data flows unaffected
   
   ## Error Sink Failure Policy
   
   If error sink itself fails → **Fail job immediately** (no fallback, keep 
semantics simple)
   
   <img width="4936" height="16111" alt="Image" 
src="https://github.com/user-attachments/assets/cfcafdba-cb73-4386-aa5d-d4f11a85f16d";
 />
   
   ## 1) Main Pipeline
   
   1. Data enters from the **Source** and sequentially passes through 
**Transform A → Transform B → Transform C → Sink**.
   
   2. After each Transform is executed, a check is performed: **success?**
   
   * **Yes (Success):** Data proceeds to the next operator (A→B→C).
   
   * **No (Failure):** If the data enters the operator with a **Classify 
error**, it will not proceed to subsequent Transforms (this satisfies your 
requirement that if A fails, it will not proceed to B/C, which is a "short 
circuit").
   
   3. Sink writes also check **Sink success?**
   
   * **Yes**: Continue normally
   
   * **No**: Enter **Classify error Sink** for classification
   
   ## 2) Error Classification: Row-level vs. System-level
   
   For exceptions thrown by Transform or Sink, first determine if it is a 
**Row-level error**:
   
   * **System-level error**
   
   For example: DB connection failure, network failure, OOM, cluster 
unavailability, etc.
   
   → Directly proceed to **Fail-fast**: Task fails and exits (preserving 
existing semantics)
   
   * **Row-level error**
   
   For example: Type mismatch, JSON parsing failure, field length exceeding 
limit, NOT NULL violation, primary key conflict, etc.
   
   → Enter **Error Handler / DLQ** for bypass processing (other normal data in 
the main chain continues)
   
   ## 3) DLQ Processing (Error Handler / DLQ)
   
   After entering DLQ, follow the configured **Mode** Decide how to handle 
row-level errors:
   
   * **DISABLE**: Discard the erroneous data directly (main process continues)
   
   * **LOG**: Log the error (main process continues)
   
   * **ROUTE**: Build a unified error record and write it to the error sink 
(the actual DLQ)
   
   In **ROUTE** mode:
   
   1. **Build unified error record**
   
   This will wrap the bad data into a unified structure, including:
   
   `error_stage / plugin_name / error_type / error_message / original_data / 
occur_time / job_id / checkpoint_id`
   
   2. **Update counters**
   
   Statistics on the number of errors and the error ratio (error ratio & error 
count).
   
   3. **Threshold exceeded?**
   
   * If `max_error_ratio` or `max_error_records` is exceeded, the process 
**Fail-fast** will fail immediately (to prevent the silent swallowing of too 
much dirty data).
   
   * If not exceeded, the process will proceed to the asynchronous write path.
   
   4. **Enqueue to bounded async queue**
   
   For performance protection, the main chain does not directly write to the 
error table synchronously; instead, it first enqueues the error.
   
   5. **Queue Overflow Policy**
   
   * **FAIL**: Fail when the queue is full (most stringent)
   
   * **DROP**: Discard error records (main process continues)
   
   * **BLOCK**: Block while waiting for space in the queue (creates back 
pressure on the main process)
   
   6. **Async Batch Writer → Error Sink**
   
   * Writes error records to your configured DLQ Sink (e.g., JDBC table) in the 
background in batches.
   
   7. **Error Sink OK?**
   
   * **Yes**: Ack complete
   
   * **No**: **Fail-fast (Error sink failure)**
   
   As per your requirements: If the error sink fails, the task immediately 
fails; the semantics are simple and clear.
   
   ## 4) Configuration Activation Order (3-Level Config Resolution)
   
   The bottom right corner of the diagram illustrates the configuration 
resolution priority:
   
   **Plugin-level > Stage-level > Global**
   
   The resolved **Resolved Config** is "injected" into the Error Handler, 
ensuring that within the same job:
   
   * Globally, you can uniformly enable/disable/threshold/error sinks
   
   * A specific stage or plugin can override this (for finer-grained control)
   
   ### Usage Scenario
   
   _No response_
   
   ### Related issues
   
   _No response_
   
   ### Are you willing to submit a PR?
   
   - [x] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://www.apache.org/foundation/policies/conduct)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to