hawk9821 commented on code in PR #10306:
URL: https://github.com/apache/seatunnel/pull/10306#discussion_r2735272154
##########
docs/en/concept/error-handling.md:
##########
@@ -0,0 +1,310 @@
+# Error Handling (Experimental)
+
+In SeaTunnel, the default behavior is: if any Connector or Transform throws an
exception, **the entire job fails**.
+
+Starting with this experimental capability, users can change this behavior,
allowing the engine to **capture error records, route them to an error sink,
and continue advancing the job when conditions permit**.
+
+> **Status: Experimental**
+>
+> Currently, error handling JDBC Sink is validated; error handling and
row-level error routing are disabled by default. The configuration and
semantics may be adjusted in future versions.
+
+## Use Cases
+
+Typical scenarios where enabling error handling is recommended include but are
not limited to:
+
+- A small amount of dirty data exists in large batch offline tasks (such as
invalid dates, overly long strings, etc.);
+- Occasional primary key or unique constraint conflicts in sink tables;
+- Need to maintain overall job availability in the presence of individual
exception records, and record error data separately for subsequent
troubleshooting and data backfilling.
+
+Scenarios where error handling is not recommended or should be used with
caution include:
+
+- Strong at-least-once or exactly-once semantic requirements for "all valid
data must be strictly written";
+- Scenarios using complex multi-table sinks and wishing to maintain strict
consistency semantics across multiple tables.
+
+## Overall Approach
+
+After enabling error handling, the engine's processing logic for each record
can be summarized as:
+
+1. First, process the record normally through Transform / Sink according to
the original logic;
+2. If an exception occurs during processing, the engine will attempt to
distinguish:
+ - **Row-level error**: An exception caused by the data itself (such as data
format error, constraint conflict, etc.);
+ - **System-level error**: Infrastructure issues such as connection
interruption, resource shortage (OOM), etc.;
+3. For system-level errors, the behavior is consistent with the default: fail
the job directly;
+4. For situations determined to be row-level errors, the engine will hand the
record and exception information to the **ErrorHandler**:
+ - `mode = LOG`: Only log;
+ - `mode = ROUTE`: In addition to logging, write the error record to a
separately configured **error sink** (such as a JDBC error table).
+
+Other normal records will still be passed downstream along the original
pipeline.
+
+Error handling behavior is controlled through **env configuration**:
+
+- **Stage-level (env)**: Configure the default behavior for this stage
uniformly in `env.transform_error_handler` / `env.sink_error_handler`;
+- **Global (env)**: Provide default values for all stages in
`env.error_handler`.
+
+Some Transforms (such as JsonPath, DataValidator) still retain their own early
row error control options such as `row_error_handle_way`. These options can
coexist with the engine-level error handling mechanism introduced in this
document, but have not yet been automatically merged with `env.*_error_handler`.
+
+## Core Concepts
+
+### Mode
+
+The most common field in configuration is `mode`:
+
+- `DISABLE`: Disable error handling for this stage (default behavior);
+- `LOG`: Only log row-level error logs, do not route to error sink;
+- `ROUTE`: Log and route row-level errors to the error sink.
+
+If the above options are not configured at all, SeaTunnel's behavior remains
consistent with historical versions: any exception will cause the job to fail.
+
+### Error Sink
+
+The **error sink** is a dedicated sink for receiving error data, which needs
to be configured under `..._error_handler.sink`, for example:
+
+```hocon
+env {
+ sink_error_handler {
+ mode = "ROUTE"
+
+ sink {
+ plugin_name = "Jdbc"
+ error_table = "orders_sink_error_basic"
+ # Configure Jdbc Sink options for the error table here
+ }
+ }
+}
+```
+
+A common usage pattern is:
+
+- Main sink writes to business table (e.g., `orders_from_sink`);
+- Error sink writes to error table (e.g., `orders_sink_error_*`) for
subsequent troubleshooting and data backfilling.
+
+### Row-Level Error vs System-Level Error
+
+In most cases, users do not need to manually write logic to determine "whether
it is a row-level error".
+
+The engine will attempt to distinguish:
+
+- **Row-level error**: Usually caused by a single piece of data itself, the
engine can bypass this data when configuration allows and continue the job;
+- **System-level error**: Usually infrastructure issues such as connection
interruption, resource shortage (OOM), which will directly cause the job to
fail.
+
+Current version's default classification strategy (important):
+
+- **Sink stage**: If the Sink Connector does not implement
`SupportRowLevelError`, its exceptions will be treated as system-level errors
(even if `sink_error_handler` is configured, the job will still fail).
+- **Transform stage**: If the Transform does not implement
`SupportRowLevelError`, its exceptions will be treated as system-level errors
(even if `transform_error_handler` is configured, the job will still fail).
+
+For some Connectors (such as JDBC), the Connector itself will explicitly
declare "which exceptions are row-level errors" through the interface. The
engine will prioritize such explicit declarations.
+
+Only Connectors/Transforms that implement `SupportRowLevelError` can trigger
row-level errors; otherwise, all exceptions will be treated as system-level
errors and cause the job to fail.
+
+> Note
+>
+> This document describes the current version's **generic engine-level
process**. In the future, more built-in Transforms will be gradually promoted
to implement `SupportRowLevelError` to more accurately distinguish between
"row-level errors" and "system-level errors".
+
+### What Happens When Row-Level Error Occurs in Transform Stage (Important)
+
+When a Transform is determined to be a row-level error, **the record will be
dropped from the main pipeline** and will not enter subsequent Transforms, nor
will it enter downstream Sinks:
+
+- For `map(...)`: Returns `null`, equivalent to "filtering out this record";
+- For `flatMap(...)`: Returns an empty list, equivalent to "dropping this
record".
+
+If both `mode = ROUTE` is enabled and an error sink is configured, this
original record and exception information can still be written to the error
table for troubleshooting and data backfilling.
+
+## Configuration and Parameter Description
+
+### Where to Configure?
+
+Error handling currently takes effect mainly through **env configuration**:
+
+- **Stage-level (env)**: Configure the default behavior for this stage
uniformly in `env.transform_error_handler` / `env.sink_error_handler`, for
example:
+
+ ```hocon
+ env {
+ transform_error_handler {
+ mode = "ROUTE"
+
+ sink {
+ plugin_name = "Jdbc"
+ error_table = "orders_transform_error_from_env"
+ }
+ }
+
+ sink_error_handler {
+ mode = "ROUTE"
+ queue_capacity = 10000
+ queue_overflow_policy = "FAIL"
+
+ sink {
+ plugin_name = "Jdbc"
+ error_table = "orders_sink_error_from_env"
+ }
+ }
+ }
+ ```
+
+- **Global (env)**: Provide default values for all stages in
`env.error_handler`, for example:
+
+ ```hocon
+ env {
+ error_handler {
+ mode = "LOG"
+ include_original_data = true
+ include_stacktrace = false
+ }
+ }
+ ```
+
+Override order for parameters with the same name (from high to low):
+
+1. Stage-level `env.transform_error_handler` / `env.sink_error_handler`;
+2. Global `env.error_handler` (defaults to `DISABLE`).
+
+The existing error handling options of each Transform / Sink plugin (such as
`row_error_handle_way` in JsonPath / DataValidator) are currently
**independent** from the above env configuration: internal plugin options only
affect the internal behavior of the plugin, while `env.*_error_handler`
controls the engine-level row-level error bypass capability.
+
+### General Parameters Overview
+
+| Parameter | Type | Default | Description / Values
|
+|-------------------------------|---------|----------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| `mode` | String | `DISABLE` | Row-level error
handling mode: `DISABLE` (off), `LOG` (log only), `ROUTE` (log and route to
error sink).
|
+| `max_error_ratio` | Double | `0.0` | Allowed error ratio,
0.0–1.0; for example, `0.01` means fail the job when error records exceed 1%;
`0.0` means no ratio-based failure trigger.
|
+| `max_error_ratio_min_records` | Integer | `100` | Warm-up threshold for
`max_error_ratio`: when total processed records is less than this value, ratio
checks are not performed to avoid premature failure on very small samples.
|
+| `max_error_records` | Long | `0` | Maximum total number of
error records allowed; `0` means no count-based failure trigger.
|
+| `queue_capacity` | Integer | `10000` | Internal error queue
(buffer) capacity limit, maximum number of error records that can be buffered
simultaneously in the queue.
|
+| `queue_overflow_policy` | String | `FAIL` | Strategy when error
queue is full: `FAIL` (fail job), `DROP` (drop new error records), `BLOCK`
(block error-producing thread, may affect throughput).
|
+| `include_original_data` | Boolean | `false` | Whether to include
original data content in error records.
|
+| `include_stacktrace` | Boolean | `false` | Whether to include
complete Java exception stack in error records; enabling will increase the size
of individual error records.
|
+| `original_data_format` | String | `TEXT` | **Reserved parameter**.
Current version only supports `TEXT`, internally unified as string form written
to error table (`original_data` is the string representation of the record,
i.e., `String.valueOf(row)`). |
+| `original_data_max_length` | Integer | `8192` | Maximum length of
serialized original data, excess will be truncated, used to control the size of
individual error records.
|
+
+Threshold statistics scope: Current version thresholds use internal counters:
Sink counts 1 per `write(...)`; Transform chain counts 1 per
`map(...)`/`flatMap(...)` call; multiple operators on the same Transform chain
share the same counter.
+
+### Error Sink Related Parameters Overview
+
+Configure where error records should be written under `..._error_handler.sink`:
+
+| Parameter | Type | Description
|
+|----------------|--------|--------------------------------------------------------------------------------------------|
+| `plugin_name` | String | Connector name used by the error sink, such as
`Jdbc`. |
+| `error_table` | String | (JDBC-specific) Target table name for error
records, such as `orders_sink_error_basic`. |
+
+In addition, the error sink also needs to configure the regular parameters of
each Connector, such as JDBC's `url`, `username`, `password`, `driver`, etc.,
written exactly the same as a normal Sink.
+
+If `mode = ROUTE` but no `sink { ... }` is configured (or `plugin_name` is
empty), row-level errors will be identified and logged, but since there is no
available error sink, error records will not be written to the error table.
+
+### Error Table Structure
+
+Currently, the engine constructs a unified error table schema for the error
sink (taking JDBC as an example):
+
+- `error_stage`: String, the stage where the error occurred (such as
`TRANSFORM` / `SINK`);
+- `plugin_type`: String, plugin type (such as `TRANSFORM` / `SINK`);
+- `plugin_name`: String, plugin name (such as `Jdbc`, etc.);
+- `source_table_path`: String, source table path or identifier;
+- `error_message`: String, brief error message of the exception (truncated
according to internal upper limit);
+- `exception_class`: String, exception class name;
+- `stacktrace`: String, complete stack information (only filled when
`include_stacktrace = true`);
+- `original_data`: String, original data content (only filled when
`include_original_data = true`, length controlled by
`original_data_max_length`);
+- `occur_time`: Timestamp, error occurrence time (UTC).
+
+The above field names remain consistent across different error tables for
unified query and analysis.
+
+## How JDBC Error Handling Works (Key)
+
+JDBC is currently the main Connector using row-level error handling capability.
+
+### What Counts as "Row-Level Error" in JDBC?
+
+`JdbcSinkWriter` checks the `SQLException` chain, and if it finds:
+
+- `SQLState` starting with `22` — data exception (such as data too long, type
mismatch);
+- `SQLState` starting with `23` — integrity constraint exception (such as
primary key/unique key conflict);
+
+It will treat it as a **row-level error**. Otherwise, it is treated as a
**system-level error**, directly causing the job to fail.
+
+For other Sinks, if `SupportRowLevelError` interface is not implemented, the
engine will more conservatively treat exceptions as system-level errors: even
if `sink_error_handler` is configured, such exceptions will not be bypassed as
row-level errors, but will directly fail the job.
+
+### What Happens to Batch Processing When Row-Level Error Occurs?
+
+JDBC Sink typically puts multiple records in a JDBC batch and sends them to
the database at once.
+
+When a **row-level error** occurs while writing a record:
+
+- The Connector will catch this exception;
+- If it determines this is a "row-level data error", it will call a helper
method to **clear the current JDBC batch in memory**.
+
+This means:
+
+- All records in the current batch that "have not yet been actually sent to
the database but have been added to the batch" will be cleared together;
+- This bad record will be handed to the error handler (write log / write error
table);
+- Other "good records" in the same batch will **not be automatically retried**.
+
+From the user's perspective, it can be understood as:
+
+> **Once a row-level error appears in this batch, the entire batch is treated
as an "error batch".**
+
+Therefore, in the combination of "**batch enabled and error handling
enabled**":
+
+- There may be a very small number of originally valid records that were not
written to the target database due to being in the same batch as error data;
+- Strict at-least-once semantics for "all valid records" no longer have formal
guarantees under this configuration combination.
+
+The above behavior is a current implementation detail at the Connector level,
and implementations for different Sinks will be gradually optimized in the
future to reduce the probability of mistakenly affecting valid records and
improve traceability.
+
+### JDBC Usage Recommendations
+
+- If you care more about job stability and can accept a small number of valid
records being dropped in error batches:
+ - You can enable error handling and retain batch writing;
+ - Error tables and logs can be used for post-hoc analysis and data
backfilling of error data.
+
+- If you have strict requirements for "no valid record shall be lost":
+ - Consider disabling JDBC row-level error handling, or
+ - When enabling error handling, reduce `batch_size` (even set to `1`) so
that each batch contains at most one record;
+ - It is strongly recommended to thoroughly validate with your actual
database and JDBC driver in a test environment before enabling this capability
in production.
+
+## Current Status of Multi-Table Sink
+
+> **Experimental capability, not yet fully supported.**
+
+## Basic Configuration Example (Single-Table JDBC Sink)
+
+Below is a minimal example demonstrating how to route row-level errors to a
JDBC error table in the Sink stage:
+
+```hocon
+env {
+ sink_error_handler {
+ mode = "ROUTE" # or LOG / DISABLE
+ max_error_ratio = 0.01 # Fail job when error ratio > 1%
+ max_error_records = 1000 # Or when total errors > 1000
+ queue_capacity = 10000
+ queue_overflow_policy = "FAIL" # FAIL / DROP / BLOCK
+
+ include_original_data = true
+ include_stacktrace = false
+ original_data_format = "TEXT"
+ original_data_max_length = 8192
+
+ sink {
+ plugin_name = "Jdbc"
+ error_table = "orders_sink_error_basic"
+ # Configure Jdbc Sink options for the error table here
+ }
+ }
+}
+```
+
+### MySQL Error Table Structure
+
+If you use MySQL as the error sink, you need to create the table manually with
the following structure:
Review Comment:
Built-in error table structure , Is automatic creation better?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]