>From an end-user perspective, I think this sounds like a good idea and the ergonomics of the SQL feels right. I suspect the multiple sinks outlined in 5.2 could get unwieldy, and could be addressed by the WITH CONTEXT and subsequent filtering for routing. One thing I'd suggest is some kind of threshold, so that _some_ failed records would route to the sink, but if too many (either absolute or %) would fail the statement. I guess the SQL would be something like "FAIL AFTER" or something along those lines.
Thanks, Robin. On Fri, 24 Apr 2026 at 12:49, FeatZhang <[email protected]> wrote: > Hi Flink devs, > > I’d like to start a discussion on a potential enhancement to Flink SQL > around *declarative error handling and side-output semantics*. > > This is an early-stage proposal and I’m mainly looking for feedback on > feasibility, scope, and design direction. > ------------------------------ > 1. Background > > In production streaming systems built on Flink SQL, data quality issues are > a persistent challenge, especially in: > > - Kafka ingestion pipelines > - CDC streams > - Log/event processing systems > - Schema-evolving data sources > - UDF-heavy transformation pipelines > > Common failure modes include: > > - malformed JSON / Avro records > - type mismatches during casting > - schema drift (missing/extra fields) > - runtime UDF exceptions > - partial parsing failures > > ------------------------------ > 2. Current Limitations > > Today, Flink SQL provides only partial solutions: > > - TRY_CAST / defensive expressions > - json.ignore-parse-errors > - null-based filtering > - fallback to DataStream API > > However, these approaches are fragmented and do not provide a unified > abstraction. > Key gaps: 2.1 No unified error handling model > > Different failure modes require different mechanisms, with no consistent > cross-operator abstraction. > 2.2 No SQL-level side output semantics > > Unlike DataStream API (OutputTag), SQL cannot express: > > route invalid records to a separate sink > > 2.3 No structured error observability > > Even when errors are handled: > > - no error type > - no raw record preservation > - no column-level context > - no standardized routing mechanism > > ------------------------------ > 3. Proposal: ON ERROR Clause > > We propose introducing an optional SQL clause: > > INSERT INTO clean_sink > SELECT * > FROM source_table > ON ERROR INTO dirty_sink; > > ------------------------------ > 4. Semantics (Initial Proposal) > > The execution semantics are: > > - Records successfully processed → clean_sink > - Records failing during: > - parsing > - expression evaluation > - UDF execution > - operator-level exceptions > > → routed to dirty_sink > > Importantly: > > - main pipeline execution is NOT interrupted > - error handling is non-fatal by default > > ------------------------------ > 5. Optional Extensions (For Discussion) 5.1 Error context propagation > > ON ERROR INTO dirty_sink WITH CONTEXT; > > This would allow enriching the dirty stream with: > > - raw record > - error message > - error type > - failing field (if available) > - timestamp > > ------------------------------ > 5.2 Error classification routing > > ON ERROR INTO ( > parse_error_sink, > type_error_sink, > udf_error_sink > ); > > ------------------------------ > 5.3 Retry / fallback semantics (future extension) > > ON ERROR > RETRY 3 TIMES > BACKOFF 2s > THEN INTO dirty_sink; > > ------------------------------ > 6. Execution Model > > Conceptually, this introduces a *dual-output operator semantics*: > > record > | > v > +------------------------+ > | ErrorAwareOperator | > | (try/catch boundary) | > +------------------------+ > | | > | | > v v > clean stream dirty stream > > ------------------------------ > 7. Relation to Existing Flink Concepts > > This proposal is essentially a SQL-level abstraction over existing runtime > capability: > > - DataStream Side Output (OutputTag) > - per-record try/catch execution > - operator-level failure isolation > > The key gap is *exposing this capability at SQL layer in a declarative > form* > . > ------------------------------ > 8. Design Considerations & Risks > > I want to explicitly call out a few open concerns: > 8.1 Performance overhead > > Introducing per-record exception boundaries may introduce: > > - CPU overhead (try/catch) > - potential JIT inhibition > > We may need optimized execution strategies (e.g. selective instrumentation > or vectorized paths). > ------------------------------ > 8.2 Semantics of exactly-once > > Open question: > > - Should dirty stream be exactly-once, at-least-once, or best-effort? > - Should it participate in checkpointing consistency? > > ------------------------------ > 8.3 Scope definition > > We need to clarify whether this applies to: > > - INSERT INTO only (likely initial scope) > - or SELECT / views / CTAS / materialized views > > ------------------------------ > 8.4 Connector interaction > > Error classification may differ across: > > - Kafka / filesystem / CDC connectors > - UDF-defined errors > - format-level parsing errors > > Standardization may be required. > ------------------------------ > 9. Comparison with Existing Systems > System SQL-side error routing Unified model > Flink SQL (current) ❌ ❌ > Spark SQL ❌ partial > Trino ❌ ❌ > Flink DataStream ✔ ✔ (but not SQL exposed) > ------------------------------ > 10. Motivation Summary > > The goal is to bridge the gap between: > > - DataStream-level expressiveness (side outputs) > - SQL-level declarative usability > > This would enable: > > - cleaner production pipelines > - better data quality isolation > - improved observability > - reduced need to drop into DataStream API for error handling > > ------------------------------ > 11. Open Questions for the Community > > I’d appreciate feedback on: > > 1. Is ON ERROR too broad for initial scope? > 2. Should this be limited to parsing-level errors first? > 3. What should the default delivery semantics of dirty streams be? > 4. Should error classification be standardized at Flink core level or > connector level? > 5. Would this overlap too much with existing TRY_* patterns? > 6. Is Calcite the right place to extend this semantics? > > ------------------------------ > 12. Next Step (if direction is accepted) > > If there is alignment, potential follow-ups could be: > > - Formal FLIP document (RFC stage) > - Calcite grammar extension > - Planner-level LogicalErrorHandlingNode > - Runtime ErrorAwareOperator prototype > - Benchmark on Kafka ingestion workloads > > ------------------------------ > Closing > > I believe Flink SQL is missing a *first-class declarative error handling > abstraction*, and this limits its usability in production data quality > scenarios. > > This proposal is an attempt to explore whether we can bring > *DataStream-style > side output semantics into SQL in a clean and composable way*. > > Feedback on feasibility, design boundaries, and alternative approaches is > very welcome. > ------------------------------ >
