Hi Flink devs,
I’d like to start a discussion on a potential enhancement to Flink SQL
around *declarative error handling and side-output semantics*.
This is an early-stage proposal and I’m mainly looking for feedback on
feasibility, scope, and design direction.
------------------------------
1. Background
In production streaming systems built on Flink SQL, data quality issues are
a persistent challenge, especially in:
- Kafka ingestion pipelines
- CDC streams
- Log/event processing systems
- Schema-evolving data sources
- UDF-heavy transformation pipelines
Common failure modes include:
- malformed JSON / Avro records
- type mismatches during casting
- schema drift (missing/extra fields)
- runtime UDF exceptions
- partial parsing failures
------------------------------
2. Current Limitations
Today, Flink SQL provides only partial solutions:
- TRY_CAST / defensive expressions
- json.ignore-parse-errors
- null-based filtering
- fallback to DataStream API
However, these approaches are fragmented and do not provide a unified
abstraction.
Key gaps: 2.1 No unified error handling model
Different failure modes require different mechanisms, with no consistent
cross-operator abstraction.
2.2 No SQL-level side output semantics
Unlike DataStream API (OutputTag), SQL cannot express:
route invalid records to a separate sink
2.3 No structured error observability
Even when errors are handled:
- no error type
- no raw record preservation
- no column-level context
- no standardized routing mechanism
------------------------------
3. Proposal: ON ERROR Clause
We propose introducing an optional SQL clause:
INSERT INTO clean_sink
SELECT *
FROM source_table
ON ERROR INTO dirty_sink;
------------------------------
4. Semantics (Initial Proposal)
The execution semantics are:
- Records successfully processed → clean_sink
- Records failing during:
- parsing
- expression evaluation
- UDF execution
- operator-level exceptions
→ routed to dirty_sink
Importantly:
- main pipeline execution is NOT interrupted
- error handling is non-fatal by default
------------------------------
5. Optional Extensions (For Discussion) 5.1 Error context propagation
ON ERROR INTO dirty_sink WITH CONTEXT;
This would allow enriching the dirty stream with:
- raw record
- error message
- error type
- failing field (if available)
- timestamp
------------------------------
5.2 Error classification routing
ON ERROR INTO (
parse_error_sink,
type_error_sink,
udf_error_sink
);
------------------------------
5.3 Retry / fallback semantics (future extension)
ON ERROR
RETRY 3 TIMES
BACKOFF 2s
THEN INTO dirty_sink;
------------------------------
6. Execution Model
Conceptually, this introduces a *dual-output operator semantics*:
record
|
v
+------------------------+
| ErrorAwareOperator |
| (try/catch boundary) |
+------------------------+
| |
| |
v v
clean stream dirty stream
------------------------------
7. Relation to Existing Flink Concepts
This proposal is essentially a SQL-level abstraction over existing runtime
capability:
- DataStream Side Output (OutputTag)
- per-record try/catch execution
- operator-level failure isolation
The key gap is *exposing this capability at SQL layer in a declarative form*
.
------------------------------
8. Design Considerations & Risks
I want to explicitly call out a few open concerns:
8.1 Performance overhead
Introducing per-record exception boundaries may introduce:
- CPU overhead (try/catch)
- potential JIT inhibition
We may need optimized execution strategies (e.g. selective instrumentation
or vectorized paths).
------------------------------
8.2 Semantics of exactly-once
Open question:
- Should dirty stream be exactly-once, at-least-once, or best-effort?
- Should it participate in checkpointing consistency?
------------------------------
8.3 Scope definition
We need to clarify whether this applies to:
- INSERT INTO only (likely initial scope)
- or SELECT / views / CTAS / materialized views
------------------------------
8.4 Connector interaction
Error classification may differ across:
- Kafka / filesystem / CDC connectors
- UDF-defined errors
- format-level parsing errors
Standardization may be required.
------------------------------
9. Comparison with Existing Systems
System SQL-side error routing Unified model
Flink SQL (current) ❌ ❌
Spark SQL ❌ partial
Trino ❌ ❌
Flink DataStream ✔ ✔ (but not SQL exposed)
------------------------------
10. Motivation Summary
The goal is to bridge the gap between:
- DataStream-level expressiveness (side outputs)
- SQL-level declarative usability
This would enable:
- cleaner production pipelines
- better data quality isolation
- improved observability
- reduced need to drop into DataStream API for error handling
------------------------------
11. Open Questions for the Community
I’d appreciate feedback on:
1. Is ON ERROR too broad for initial scope?
2. Should this be limited to parsing-level errors first?
3. What should the default delivery semantics of dirty streams be?
4. Should error classification be standardized at Flink core level or
connector level?
5. Would this overlap too much with existing TRY_* patterns?
6. Is Calcite the right place to extend this semantics?
------------------------------
12. Next Step (if direction is accepted)
If there is alignment, potential follow-ups could be:
- Formal FLIP document (RFC stage)
- Calcite grammar extension
- Planner-level LogicalErrorHandlingNode
- Runtime ErrorAwareOperator prototype
- Benchmark on Kafka ingestion workloads
------------------------------
Closing
I believe Flink SQL is missing a *first-class declarative error handling
abstraction*, and this limits its usability in production data quality
scenarios.
This proposal is an attempt to explore whether we can bring *DataStream-style
side output semantics into SQL in a clean and composable way*.
Feedback on feasibility, design boundaries, and alternative approaches is
very welcome.
------------------------------