Hi Everyone, I wanted to comment that this feature would be really useful. I still have many questions about how we will implement it scalably.
I'd have to read the spec for the Confluent solution but from your snippet Timo, it doesn't sound like the right interface for OSS. You would know best though. However I understand the concerns with ON ERROR. Spitballing: - Is this something we can inject dynamically with a SQL hint? Regarding the schema, I favour Bytes. Thanks for driving this feature—it's a pain point for us! Ryan van Huuksloot Staff Engineer, Infrastructure | Streaming Platform [image: Shopify] <https://www.shopify.com/?utm_medium=salessignatures&utm_source=hs_email> On Mon, Apr 27, 2026 at 8:21 AM Timo Walther <[email protected]> wrote: > Hi Feat, > > more declarative error handling makes a lot of sense. In our Cloud > product at Confluent, we have already implemented error handling for > deserialization errors by adding an additional ProcessFunction closely > after the source with side outputs and a wrapping STATEMENT SET under > the hood. > > We are open to contribute this back if it make sense. > > However, our current solution only handles errors in the source/format. > Of course a global ON ERROR would be more beneficial, however, it is > unclear to me how to implement this in a scalable way. A side output has > costs (output and input network buffers on the sender and receiver > side), we cannot arbitrarily add side outputs to all operators of a SQL > pipeline. Given that even relatively easy SQL might be translates into > 10+ operators, using side outputs might not scale. > > Also, sources currently don't support side outputs. This is why we > decided for a ProcessFunction after the source for now. > > Also what is the schema of the Dead Letter Queue? At Confluent we > decided for BYTES columns from which data can be re-deserialized. > > Cheers, > Timo > > > On 24.04.26 13:49, FeatZhang wrote: > > Hi Flink devs, > > > > I’d like to start a discussion on a potential enhancement to Flink SQL > > around *declarative error handling and side-output semantics*. > > > > This is an early-stage proposal and I’m mainly looking for feedback on > > feasibility, scope, and design direction. > > ------------------------------ > > 1. Background > > > > In production streaming systems built on Flink SQL, data quality issues > are > > a persistent challenge, especially in: > > > > - Kafka ingestion pipelines > > - CDC streams > > - Log/event processing systems > > - Schema-evolving data sources > > - UDF-heavy transformation pipelines > > > > Common failure modes include: > > > > - malformed JSON / Avro records > > - type mismatches during casting > > - schema drift (missing/extra fields) > > - runtime UDF exceptions > > - partial parsing failures > > > > ------------------------------ > > 2. Current Limitations > > > > Today, Flink SQL provides only partial solutions: > > > > - TRY_CAST / defensive expressions > > - json.ignore-parse-errors > > - null-based filtering > > - fallback to DataStream API > > > > However, these approaches are fragmented and do not provide a unified > > abstraction. > > Key gaps: 2.1 No unified error handling model > > > > Different failure modes require different mechanisms, with no consistent > > cross-operator abstraction. > > 2.2 No SQL-level side output semantics > > > > Unlike DataStream API (OutputTag), SQL cannot express: > > > > route invalid records to a separate sink > > > > 2.3 No structured error observability > > > > Even when errors are handled: > > > > - no error type > > - no raw record preservation > > - no column-level context > > - no standardized routing mechanism > > > > ------------------------------ > > 3. Proposal: ON ERROR Clause > > > > We propose introducing an optional SQL clause: > > > > INSERT INTO clean_sink > > SELECT * > > FROM source_table > > ON ERROR INTO dirty_sink; > > > > ------------------------------ > > 4. Semantics (Initial Proposal) > > > > The execution semantics are: > > > > - Records successfully processed → clean_sink > > - Records failing during: > > - parsing > > - expression evaluation > > - UDF execution > > - operator-level exceptions > > > > → routed to dirty_sink > > > > Importantly: > > > > - main pipeline execution is NOT interrupted > > - error handling is non-fatal by default > > > > ------------------------------ > > 5. Optional Extensions (For Discussion) 5.1 Error context propagation > > > > ON ERROR INTO dirty_sink WITH CONTEXT; > > > > This would allow enriching the dirty stream with: > > > > - raw record > > - error message > > - error type > > - failing field (if available) > > - timestamp > > > > ------------------------------ > > 5.2 Error classification routing > > > > ON ERROR INTO ( > > parse_error_sink, > > type_error_sink, > > udf_error_sink > > ); > > > > ------------------------------ > > 5.3 Retry / fallback semantics (future extension) > > > > ON ERROR > > RETRY 3 TIMES > > BACKOFF 2s > > THEN INTO dirty_sink; > > > > ------------------------------ > > 6. Execution Model > > > > Conceptually, this introduces a *dual-output operator semantics*: > > > > record > > | > > v > > +------------------------+ > > | ErrorAwareOperator | > > | (try/catch boundary) | > > +------------------------+ > > | | > > | | > > v v > > clean stream dirty stream > > > > ------------------------------ > > 7. Relation to Existing Flink Concepts > > > > This proposal is essentially a SQL-level abstraction over existing > runtime > > capability: > > > > - DataStream Side Output (OutputTag) > > - per-record try/catch execution > > - operator-level failure isolation > > > > The key gap is *exposing this capability at SQL layer in a declarative > form* > > . > > ------------------------------ > > 8. Design Considerations & Risks > > > > I want to explicitly call out a few open concerns: > > 8.1 Performance overhead > > > > Introducing per-record exception boundaries may introduce: > > > > - CPU overhead (try/catch) > > - potential JIT inhibition > > > > We may need optimized execution strategies (e.g. selective > instrumentation > > or vectorized paths). > > ------------------------------ > > 8.2 Semantics of exactly-once > > > > Open question: > > > > - Should dirty stream be exactly-once, at-least-once, or best-effort? > > - Should it participate in checkpointing consistency? > > > > ------------------------------ > > 8.3 Scope definition > > > > We need to clarify whether this applies to: > > > > - INSERT INTO only (likely initial scope) > > - or SELECT / views / CTAS / materialized views > > > > ------------------------------ > > 8.4 Connector interaction > > > > Error classification may differ across: > > > > - Kafka / filesystem / CDC connectors > > - UDF-defined errors > > - format-level parsing errors > > > > Standardization may be required. > > ------------------------------ > > 9. Comparison with Existing Systems > > System SQL-side error routing Unified model > > Flink SQL (current) ❌ ❌ > > Spark SQL ❌ partial > > Trino ❌ ❌ > > Flink DataStream ✔ ✔ (but not SQL exposed) > > ------------------------------ > > 10. Motivation Summary > > > > The goal is to bridge the gap between: > > > > - DataStream-level expressiveness (side outputs) > > - SQL-level declarative usability > > > > This would enable: > > > > - cleaner production pipelines > > - better data quality isolation > > - improved observability > > - reduced need to drop into DataStream API for error handling > > > > ------------------------------ > > 11. Open Questions for the Community > > > > I’d appreciate feedback on: > > > > 1. Is ON ERROR too broad for initial scope? > > 2. Should this be limited to parsing-level errors first? > > 3. What should the default delivery semantics of dirty streams be? > > 4. Should error classification be standardized at Flink core level or > > connector level? > > 5. Would this overlap too much with existing TRY_* patterns? > > 6. Is Calcite the right place to extend this semantics? > > > > ------------------------------ > > 12. Next Step (if direction is accepted) > > > > If there is alignment, potential follow-ups could be: > > > > - Formal FLIP document (RFC stage) > > - Calcite grammar extension > > - Planner-level LogicalErrorHandlingNode > > - Runtime ErrorAwareOperator prototype > > - Benchmark on Kafka ingestion workloads > > > > ------------------------------ > > Closing > > > > I believe Flink SQL is missing a *first-class declarative error handling > > abstraction*, and this limits its usability in production data quality > > scenarios. > > > > This proposal is an attempt to explore whether we can bring > *DataStream-style > > side output semantics into SQL in a clean and composable way*. > > > > Feedback on feasibility, design boundaries, and alternative approaches is > > very welcome. > > ------------------------------ > > > >
