Hi Timo, Robin, Ryan, and everyone,
Thank you all for the very substantive feedback. The discussion has been
extremely valuable and I'd like to converge it into a concrete v1 scope
before drafting a FLIP.
Let me summarize what I take away from the thread, and propose a scoped-
down plan that I believe addresses every concern raised.
## 1. What the thread told me
Three points are essentially settled:
(a) The pain is real and the direction is worth pursuing.
[Robin / Ryan / Timo all agreed]
(b) The DLQ payload should be raw BYTES (re-deserializable),
NOT a structured per-column schema.
[Timo + Ryan converged independently]
(c) A *global* ON ERROR clause attached to arbitrary INSERTs is
not scalable: side-output buffers, no source-side support today,
and a SQL-grammar change that overlaps with concerns belonging
to Calcite upstream.
[Timo's main objection — and I now agree]
## 2. Proposed v1 scope (drastically narrowed)
I'd like to *drop* the ON ERROR grammar from v1 entirely and instead
ship a connector/format-level Dead Letter Queue, configured purely via
CREATE TABLE WITH options:
CREATE TABLE source_table (
id BIGINT,
payload STRING,
ts TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'format' = 'json',
-- new options
'error-handling.mode' = 'dlq', -- off|log|dlq|fail-after
'error-handling.dlq.table' = 'dirty_topic',
'error-handling.fail-after.ratio' = '0.05',
'error-handling.fail-after.window' = '1min'
);
DLQ table schema is fixed in v1:
raw_payload BYTES -- original bytes, re-deserializable
source_table STRING
error_type STRING -- DESERIALIZATION_ERROR / SCHEMA_MISMATCH /
...
error_message STRING
ingestion_ts TIMESTAMP_LTZ(3)
Implementation topology — the one Timo already described:
KafkaSource ─► ErrorAwareDeserProcessFunction ─► (downstream SQL DAG)
│ side output
▼
DLQ Sink (BYTES)
The two pipelines are wired into a single job via STATEMENT SET.
Why this shape:
* No Calcite changes, no new SQL grammar.
* No per-operator instrumentation — addresses Timo's scalability
concern directly. Cost is bounded by *one* extra ProcessFunction
per source that opts in.
* Sources don't need side-output support, since the PF sits *after*
the source.
* Default OFF → zero impact on existing jobs.
## 3. How this maps to each of your points
Timo
> "global ON ERROR is not scalable"
v1 instruments only the source-attached PF, opt-in per table.
> "source operator does not support side output today"
Agreed; we attach the PF *after* the source instead.
> "DLQ schema should be BYTES"
Adopted as the only v1 schema.
> "Confluent already has an internal implementation"
This is the part where I'd really value your help — see §5.
Robin
> "FAIL AFTER N% / N rows"
Adopted, but as `error-handling.fail-after.*` connector
options rather than SQL grammar. Keeps grammar untouched
and still gives operators the safety knob.
Ryan
> "could SQL hints inject this dynamically?"
Reasonable, but I'd like to defer hints to v2: hints affect
plan rewriting, and I'd rather get the runtime substrate
landed first. v1's WITH options can be promoted to a hint
later without breaking compatibility.
> "BYTES schema"
Aligned.
## 4. Explicit Non-Goals for v1
To pre-empt scope creep, v1 will *not*:
* cover CAST / expression / UDF errors (left to v2 with an
explicit LogicalErrorBoundary node, only inserted at user-
declared boundaries — keeping Timo's "pay for what you use"
principle)
* introduce any new SQL clause or hint
* change Calcite
* support per-error-type routing to multiple sinks
* support retry / backoff
## 5. Asks
(1) Timo — would you be open to a brief offline sync on the
Confluent ProcessFunction + STATEMENT SET implementation?
If we can use that as the starting point rather than
reinventing it, we'd save several months and the design
would be battle-tested by definition. Happy to do this on
Slack / Zoom at your convenience.
(2) To everyone — does anyone object to using
`CREATE TABLE WITH ('error-handling.*' = ...)` as the v1
surface? If silence == consent, I'll proceed to draft a
FLIP titled
"FLIP-XXX: Dead Letter Queue for Flink SQL Source/Format"
(note the deliberate rename — narrower scope, narrower
title) and post it within ~2 weeks.
(3) Robin / Ryan — would you be willing to be cited as
use-case references in the FLIP's Motivation section?
Concrete production scenarios significantly help the
vote.
Thanks again for steering this in a much more pragmatic direction.
Best,
FeatZhang
On Mon, Apr 27, 2026 at 10:04 PM Ryan van Huuksloot via dev <
[email protected]> wrote:
> Hi Everyone,
>
> I wanted to comment that this feature would be really useful. I still have
> many questions about how we will implement it scalably.
>
> I'd have to read the spec for the Confluent solution but from your snippet
> Timo, it doesn't sound like the right interface for OSS. You would know
> best though.
> However I understand the concerns with ON ERROR.
>
> Spitballing:
> - Is this something we can inject dynamically with a SQL hint?
>
> Regarding the schema, I favour Bytes.
>
> Thanks for driving this feature—it's a pain point for us!
>
> Ryan van Huuksloot
> Staff Engineer, Infrastructure | Streaming Platform
> [image: Shopify]
> <https://www.shopify.com/?utm_medium=salessignatures&utm_source=hs_email>
>
>
> On Mon, Apr 27, 2026 at 8:21 AM Timo Walther <[email protected]> wrote:
>
> > Hi Feat,
> >
> > more declarative error handling makes a lot of sense. In our Cloud
> > product at Confluent, we have already implemented error handling for
> > deserialization errors by adding an additional ProcessFunction closely
> > after the source with side outputs and a wrapping STATEMENT SET under
> > the hood.
> >
> > We are open to contribute this back if it make sense.
> >
> > However, our current solution only handles errors in the source/format.
> > Of course a global ON ERROR would be more beneficial, however, it is
> > unclear to me how to implement this in a scalable way. A side output has
> > costs (output and input network buffers on the sender and receiver
> > side), we cannot arbitrarily add side outputs to all operators of a SQL
> > pipeline. Given that even relatively easy SQL might be translates into
> > 10+ operators, using side outputs might not scale.
> >
> > Also, sources currently don't support side outputs. This is why we
> > decided for a ProcessFunction after the source for now.
> >
> > Also what is the schema of the Dead Letter Queue? At Confluent we
> > decided for BYTES columns from which data can be re-deserialized.
> >
> > Cheers,
> > Timo
> >
> >
> > On 24.04.26 13:49, FeatZhang wrote:
> > > Hi Flink devs,
> > >
> > > I’d like to start a discussion on a potential enhancement to Flink SQL
> > > around *declarative error handling and side-output semantics*.
> > >
> > > This is an early-stage proposal and I’m mainly looking for feedback on
> > > feasibility, scope, and design direction.
> > > ------------------------------
> > > 1. Background
> > >
> > > In production streaming systems built on Flink SQL, data quality issues
> > are
> > > a persistent challenge, especially in:
> > >
> > > - Kafka ingestion pipelines
> > > - CDC streams
> > > - Log/event processing systems
> > > - Schema-evolving data sources
> > > - UDF-heavy transformation pipelines
> > >
> > > Common failure modes include:
> > >
> > > - malformed JSON / Avro records
> > > - type mismatches during casting
> > > - schema drift (missing/extra fields)
> > > - runtime UDF exceptions
> > > - partial parsing failures
> > >
> > > ------------------------------
> > > 2. Current Limitations
> > >
> > > Today, Flink SQL provides only partial solutions:
> > >
> > > - TRY_CAST / defensive expressions
> > > - json.ignore-parse-errors
> > > - null-based filtering
> > > - fallback to DataStream API
> > >
> > > However, these approaches are fragmented and do not provide a unified
> > > abstraction.
> > > Key gaps: 2.1 No unified error handling model
> > >
> > > Different failure modes require different mechanisms, with no
> consistent
> > > cross-operator abstraction.
> > > 2.2 No SQL-level side output semantics
> > >
> > > Unlike DataStream API (OutputTag), SQL cannot express:
> > >
> > > route invalid records to a separate sink
> > >
> > > 2.3 No structured error observability
> > >
> > > Even when errors are handled:
> > >
> > > - no error type
> > > - no raw record preservation
> > > - no column-level context
> > > - no standardized routing mechanism
> > >
> > > ------------------------------
> > > 3. Proposal: ON ERROR Clause
> > >
> > > We propose introducing an optional SQL clause:
> > >
> > > INSERT INTO clean_sink
> > > SELECT *
> > > FROM source_table
> > > ON ERROR INTO dirty_sink;
> > >
> > > ------------------------------
> > > 4. Semantics (Initial Proposal)
> > >
> > > The execution semantics are:
> > >
> > > - Records successfully processed → clean_sink
> > > - Records failing during:
> > > - parsing
> > > - expression evaluation
> > > - UDF execution
> > > - operator-level exceptions
> > >
> > > → routed to dirty_sink
> > >
> > > Importantly:
> > >
> > > - main pipeline execution is NOT interrupted
> > > - error handling is non-fatal by default
> > >
> > > ------------------------------
> > > 5. Optional Extensions (For Discussion) 5.1 Error context propagation
> > >
> > > ON ERROR INTO dirty_sink WITH CONTEXT;
> > >
> > > This would allow enriching the dirty stream with:
> > >
> > > - raw record
> > > - error message
> > > - error type
> > > - failing field (if available)
> > > - timestamp
> > >
> > > ------------------------------
> > > 5.2 Error classification routing
> > >
> > > ON ERROR INTO (
> > > parse_error_sink,
> > > type_error_sink,
> > > udf_error_sink
> > > );
> > >
> > > ------------------------------
> > > 5.3 Retry / fallback semantics (future extension)
> > >
> > > ON ERROR
> > > RETRY 3 TIMES
> > > BACKOFF 2s
> > > THEN INTO dirty_sink;
> > >
> > > ------------------------------
> > > 6. Execution Model
> > >
> > > Conceptually, this introduces a *dual-output operator semantics*:
> > >
> > > record
> > > |
> > > v
> > > +------------------------+
> > > | ErrorAwareOperator |
> > > | (try/catch boundary) |
> > > +------------------------+
> > > | |
> > > | |
> > > v v
> > > clean stream dirty stream
> > >
> > > ------------------------------
> > > 7. Relation to Existing Flink Concepts
> > >
> > > This proposal is essentially a SQL-level abstraction over existing
> > runtime
> > > capability:
> > >
> > > - DataStream Side Output (OutputTag)
> > > - per-record try/catch execution
> > > - operator-level failure isolation
> > >
> > > The key gap is *exposing this capability at SQL layer in a declarative
> > form*
> > > .
> > > ------------------------------
> > > 8. Design Considerations & Risks
> > >
> > > I want to explicitly call out a few open concerns:
> > > 8.1 Performance overhead
> > >
> > > Introducing per-record exception boundaries may introduce:
> > >
> > > - CPU overhead (try/catch)
> > > - potential JIT inhibition
> > >
> > > We may need optimized execution strategies (e.g. selective
> > instrumentation
> > > or vectorized paths).
> > > ------------------------------
> > > 8.2 Semantics of exactly-once
> > >
> > > Open question:
> > >
> > > - Should dirty stream be exactly-once, at-least-once, or
> best-effort?
> > > - Should it participate in checkpointing consistency?
> > >
> > > ------------------------------
> > > 8.3 Scope definition
> > >
> > > We need to clarify whether this applies to:
> > >
> > > - INSERT INTO only (likely initial scope)
> > > - or SELECT / views / CTAS / materialized views
> > >
> > > ------------------------------
> > > 8.4 Connector interaction
> > >
> > > Error classification may differ across:
> > >
> > > - Kafka / filesystem / CDC connectors
> > > - UDF-defined errors
> > > - format-level parsing errors
> > >
> > > Standardization may be required.
> > > ------------------------------
> > > 9. Comparison with Existing Systems
> > > System SQL-side error routing Unified model
> > > Flink SQL (current) ❌ ❌
> > > Spark SQL ❌ partial
> > > Trino ❌ ❌
> > > Flink DataStream ✔ ✔ (but not SQL exposed)
> > > ------------------------------
> > > 10. Motivation Summary
> > >
> > > The goal is to bridge the gap between:
> > >
> > > - DataStream-level expressiveness (side outputs)
> > > - SQL-level declarative usability
> > >
> > > This would enable:
> > >
> > > - cleaner production pipelines
> > > - better data quality isolation
> > > - improved observability
> > > - reduced need to drop into DataStream API for error handling
> > >
> > > ------------------------------
> > > 11. Open Questions for the Community
> > >
> > > I’d appreciate feedback on:
> > >
> > > 1. Is ON ERROR too broad for initial scope?
> > > 2. Should this be limited to parsing-level errors first?
> > > 3. What should the default delivery semantics of dirty streams be?
> > > 4. Should error classification be standardized at Flink core level
> or
> > > connector level?
> > > 5. Would this overlap too much with existing TRY_* patterns?
> > > 6. Is Calcite the right place to extend this semantics?
> > >
> > > ------------------------------
> > > 12. Next Step (if direction is accepted)
> > >
> > > If there is alignment, potential follow-ups could be:
> > >
> > > - Formal FLIP document (RFC stage)
> > > - Calcite grammar extension
> > > - Planner-level LogicalErrorHandlingNode
> > > - Runtime ErrorAwareOperator prototype
> > > - Benchmark on Kafka ingestion workloads
> > >
> > > ------------------------------
> > > Closing
> > >
> > > I believe Flink SQL is missing a *first-class declarative error
> handling
> > > abstraction*, and this limits its usability in production data quality
> > > scenarios.
> > >
> > > This proposal is an attempt to explore whether we can bring
> > *DataStream-style
> > > side output semantics into SQL in a clean and composable way*.
> > >
> > > Feedback on feasibility, design boundaries, and alternative approaches
> is
> > > very welcome.
> > > ------------------------------
> > >
> >
> >
>