Hi Timo, Robin, Ryan, and everyone,

Thank you all for the very substantive feedback. The discussion has been
extremely valuable and I'd like to converge it into a concrete v1 scope
before drafting a FLIP.

Let me summarize what I take away from the thread, and propose a scoped-
down plan that I believe addresses every concern raised.


## 1. What the thread told me

Three points are essentially settled:

  (a) The pain is real and the direction is worth pursuing.
      [Robin / Ryan / Timo all agreed]

  (b) The DLQ payload should be raw BYTES (re-deserializable),
      NOT a structured per-column schema.
      [Timo + Ryan converged independently]

  (c) A *global* ON ERROR clause attached to arbitrary INSERTs is
      not scalable: side-output buffers, no source-side support today,
      and a SQL-grammar change that overlaps with concerns belonging
      to Calcite upstream.
      [Timo's main objection — and I now agree]


## 2. Proposed v1 scope (drastically narrowed)

I'd like to *drop* the ON ERROR grammar from v1 entirely and instead
ship a connector/format-level Dead Letter Queue, configured purely via
CREATE TABLE WITH options:

  CREATE TABLE source_table (
    id BIGINT,
    payload STRING,
    ts TIMESTAMP(3)
  ) WITH (
    'connector' = 'kafka',
    'format'    = 'json',
    -- new options
    'error-handling.mode'              = 'dlq',  -- off|log|dlq|fail-after
    'error-handling.dlq.table'         = 'dirty_topic',
    'error-handling.fail-after.ratio'  = '0.05',
    'error-handling.fail-after.window' = '1min'
  );

DLQ table schema is fixed in v1:

  raw_payload    BYTES        -- original bytes, re-deserializable
  source_table   STRING
  error_type     STRING       -- DESERIALIZATION_ERROR / SCHEMA_MISMATCH /
...
  error_message  STRING
  ingestion_ts   TIMESTAMP_LTZ(3)

Implementation topology — the one Timo already described:

  KafkaSource ─► ErrorAwareDeserProcessFunction ─► (downstream SQL DAG)
                       │ side output
                       ▼
                  DLQ Sink (BYTES)

The two pipelines are wired into a single job via STATEMENT SET.

Why this shape:
  * No Calcite changes, no new SQL grammar.
  * No per-operator instrumentation — addresses Timo's scalability
    concern directly. Cost is bounded by *one* extra ProcessFunction
    per source that opts in.
  * Sources don't need side-output support, since the PF sits *after*
    the source.
  * Default OFF → zero impact on existing jobs.


## 3. How this maps to each of your points

  Timo
    > "global ON ERROR is not scalable"
        v1 instruments only the source-attached PF, opt-in per table.
    > "source operator does not support side output today"
        Agreed; we attach the PF *after* the source instead.
    > "DLQ schema should be BYTES"
        Adopted as the only v1 schema.
    > "Confluent already has an internal implementation"
        This is the part where I'd really value your help — see §5.

  Robin
    > "FAIL AFTER N% / N rows"
        Adopted, but as `error-handling.fail-after.*` connector
        options rather than SQL grammar. Keeps grammar untouched
        and still gives operators the safety knob.

  Ryan
    > "could SQL hints inject this dynamically?"
        Reasonable, but I'd like to defer hints to v2: hints affect
        plan rewriting, and I'd rather get the runtime substrate
        landed first. v1's WITH options can be promoted to a hint
        later without breaking compatibility.
    > "BYTES schema"
        Aligned.


## 4. Explicit Non-Goals for v1

To pre-empt scope creep, v1 will *not*:
  * cover CAST / expression / UDF errors  (left to v2 with an
    explicit LogicalErrorBoundary node, only inserted at user-
    declared boundaries — keeping Timo's "pay for what you use"
    principle)
  * introduce any new SQL clause or hint
  * change Calcite
  * support per-error-type routing to multiple sinks
  * support retry / backoff


## 5. Asks

  (1) Timo — would you be open to a brief offline sync on the
      Confluent ProcessFunction + STATEMENT SET implementation?
      If we can use that as the starting point rather than
      reinventing it, we'd save several months and the design
      would be battle-tested by definition. Happy to do this on
      Slack / Zoom at your convenience.

  (2) To everyone — does anyone object to using
      `CREATE TABLE WITH ('error-handling.*' = ...)` as the v1
      surface? If silence == consent, I'll proceed to draft a
      FLIP titled
        "FLIP-XXX: Dead Letter Queue for Flink SQL Source/Format"
      (note the deliberate rename — narrower scope, narrower
      title) and post it within ~2 weeks.

  (3) Robin / Ryan — would you be willing to be cited as
      use-case references in the FLIP's Motivation section?
      Concrete production scenarios significantly help the
      vote.


Thanks again for steering this in a much more pragmatic direction.

Best,
FeatZhang

On Mon, Apr 27, 2026 at 10:04 PM Ryan van Huuksloot via dev <
[email protected]> wrote:

> Hi Everyone,
>
> I wanted to comment that this feature would be really useful. I still have
> many questions about how we will implement it scalably.
>
> I'd have to read the spec for the Confluent solution but from your snippet
> Timo, it doesn't sound like the right interface for OSS. You would know
> best though.
> However I understand the concerns with ON ERROR.
>
> Spitballing:
> - Is this something we can inject dynamically with a SQL hint?
>
> Regarding the schema, I favour Bytes.
>
> Thanks for driving this feature—it's a pain point for us!
>
> Ryan van Huuksloot
> Staff Engineer, Infrastructure | Streaming Platform
> [image: Shopify]
> <https://www.shopify.com/?utm_medium=salessignatures&utm_source=hs_email>
>
>
> On Mon, Apr 27, 2026 at 8:21 AM Timo Walther <[email protected]> wrote:
>
> > Hi Feat,
> >
> > more declarative error handling makes a lot of sense. In our Cloud
> > product at Confluent, we have already implemented error handling for
> > deserialization errors by adding an additional ProcessFunction closely
> > after the source with side outputs and a wrapping STATEMENT SET under
> > the hood.
> >
> > We are open to contribute this back if it make sense.
> >
> > However, our current solution only handles errors in the source/format.
> > Of course a global ON ERROR would be more beneficial, however, it is
> > unclear to me how to implement this in a scalable way. A side output has
> > costs (output and input network buffers on the sender and receiver
> > side), we cannot arbitrarily add side outputs to all operators of a SQL
> > pipeline. Given that even relatively easy SQL might be translates into
> > 10+ operators, using side outputs might not scale.
> >
> > Also, sources currently don't support side outputs. This is why we
> > decided for a ProcessFunction after the source for now.
> >
> > Also what is the schema of the Dead Letter Queue? At Confluent we
> > decided for BYTES columns from which data can be re-deserialized.
> >
> > Cheers,
> > Timo
> >
> >
> > On 24.04.26 13:49, FeatZhang wrote:
> > > Hi Flink devs,
> > >
> > > I’d like to start a discussion on a potential enhancement to Flink SQL
> > > around *declarative error handling and side-output semantics*.
> > >
> > > This is an early-stage proposal and I’m mainly looking for feedback on
> > > feasibility, scope, and design direction.
> > > ------------------------------
> > > 1. Background
> > >
> > > In production streaming systems built on Flink SQL, data quality issues
> > are
> > > a persistent challenge, especially in:
> > >
> > >     - Kafka ingestion pipelines
> > >     - CDC streams
> > >     - Log/event processing systems
> > >     - Schema-evolving data sources
> > >     - UDF-heavy transformation pipelines
> > >
> > > Common failure modes include:
> > >
> > >     - malformed JSON / Avro records
> > >     - type mismatches during casting
> > >     - schema drift (missing/extra fields)
> > >     - runtime UDF exceptions
> > >     - partial parsing failures
> > >
> > > ------------------------------
> > > 2. Current Limitations
> > >
> > > Today, Flink SQL provides only partial solutions:
> > >
> > >     - TRY_CAST / defensive expressions
> > >     - json.ignore-parse-errors
> > >     - null-based filtering
> > >     - fallback to DataStream API
> > >
> > > However, these approaches are fragmented and do not provide a unified
> > > abstraction.
> > > Key gaps: 2.1 No unified error handling model
> > >
> > > Different failure modes require different mechanisms, with no
> consistent
> > > cross-operator abstraction.
> > > 2.2 No SQL-level side output semantics
> > >
> > > Unlike DataStream API (OutputTag), SQL cannot express:
> > >
> > > route invalid records to a separate sink
> > >
> > > 2.3 No structured error observability
> > >
> > > Even when errors are handled:
> > >
> > >     - no error type
> > >     - no raw record preservation
> > >     - no column-level context
> > >     - no standardized routing mechanism
> > >
> > > ------------------------------
> > > 3. Proposal: ON ERROR Clause
> > >
> > > We propose introducing an optional SQL clause:
> > >
> > > INSERT INTO clean_sink
> > > SELECT *
> > > FROM source_table
> > > ON ERROR INTO dirty_sink;
> > >
> > > ------------------------------
> > > 4. Semantics (Initial Proposal)
> > >
> > > The execution semantics are:
> > >
> > >     - Records successfully processed → clean_sink
> > >     - Records failing during:
> > >        - parsing
> > >        - expression evaluation
> > >        - UDF execution
> > >        - operator-level exceptions
> > >
> > > → routed to dirty_sink
> > >
> > > Importantly:
> > >
> > >     - main pipeline execution is NOT interrupted
> > >     - error handling is non-fatal by default
> > >
> > > ------------------------------
> > > 5. Optional Extensions (For Discussion) 5.1 Error context propagation
> > >
> > > ON ERROR INTO dirty_sink WITH CONTEXT;
> > >
> > > This would allow enriching the dirty stream with:
> > >
> > >     - raw record
> > >     - error message
> > >     - error type
> > >     - failing field (if available)
> > >     - timestamp
> > >
> > > ------------------------------
> > > 5.2 Error classification routing
> > >
> > > ON ERROR INTO (
> > >    parse_error_sink,
> > >    type_error_sink,
> > >    udf_error_sink
> > > );
> > >
> > > ------------------------------
> > > 5.3 Retry / fallback semantics (future extension)
> > >
> > > ON ERROR
> > >    RETRY 3 TIMES
> > >    BACKOFF 2s
> > >    THEN INTO dirty_sink;
> > >
> > > ------------------------------
> > > 6. Execution Model
> > >
> > > Conceptually, this introduces a *dual-output operator semantics*:
> > >
> > > record
> > >    |
> > >    v
> > > +------------------------+
> > > | ErrorAwareOperator     |
> > > | (try/catch boundary)   |
> > > +------------------------+
> > >       |              |
> > >       |              |
> > >       v              v
> > >   clean stream   dirty stream
> > >
> > > ------------------------------
> > > 7. Relation to Existing Flink Concepts
> > >
> > > This proposal is essentially a SQL-level abstraction over existing
> > runtime
> > > capability:
> > >
> > >     - DataStream Side Output (OutputTag)
> > >     - per-record try/catch execution
> > >     - operator-level failure isolation
> > >
> > > The key gap is *exposing this capability at SQL layer in a declarative
> > form*
> > > .
> > > ------------------------------
> > > 8. Design Considerations & Risks
> > >
> > > I want to explicitly call out a few open concerns:
> > > 8.1 Performance overhead
> > >
> > > Introducing per-record exception boundaries may introduce:
> > >
> > >     - CPU overhead (try/catch)
> > >     - potential JIT inhibition
> > >
> > > We may need optimized execution strategies (e.g. selective
> > instrumentation
> > > or vectorized paths).
> > > ------------------------------
> > > 8.2 Semantics of exactly-once
> > >
> > > Open question:
> > >
> > >     - Should dirty stream be exactly-once, at-least-once, or
> best-effort?
> > >     - Should it participate in checkpointing consistency?
> > >
> > > ------------------------------
> > > 8.3 Scope definition
> > >
> > > We need to clarify whether this applies to:
> > >
> > >     - INSERT INTO only (likely initial scope)
> > >     - or SELECT / views / CTAS / materialized views
> > >
> > > ------------------------------
> > > 8.4 Connector interaction
> > >
> > > Error classification may differ across:
> > >
> > >     - Kafka / filesystem / CDC connectors
> > >     - UDF-defined errors
> > >     - format-level parsing errors
> > >
> > > Standardization may be required.
> > > ------------------------------
> > > 9. Comparison with Existing Systems
> > > System SQL-side error routing Unified model
> > > Flink SQL (current) ❌ ❌
> > > Spark SQL ❌ partial
> > > Trino ❌ ❌
> > > Flink DataStream ✔ ✔ (but not SQL exposed)
> > > ------------------------------
> > > 10. Motivation Summary
> > >
> > > The goal is to bridge the gap between:
> > >
> > >     - DataStream-level expressiveness (side outputs)
> > >     - SQL-level declarative usability
> > >
> > > This would enable:
> > >
> > >     - cleaner production pipelines
> > >     - better data quality isolation
> > >     - improved observability
> > >     - reduced need to drop into DataStream API for error handling
> > >
> > > ------------------------------
> > > 11. Open Questions for the Community
> > >
> > > I’d appreciate feedback on:
> > >
> > >     1. Is ON ERROR too broad for initial scope?
> > >     2. Should this be limited to parsing-level errors first?
> > >     3. What should the default delivery semantics of dirty streams be?
> > >     4. Should error classification be standardized at Flink core level
> or
> > >     connector level?
> > >     5. Would this overlap too much with existing TRY_* patterns?
> > >     6. Is Calcite the right place to extend this semantics?
> > >
> > > ------------------------------
> > > 12. Next Step (if direction is accepted)
> > >
> > > If there is alignment, potential follow-ups could be:
> > >
> > >     - Formal FLIP document (RFC stage)
> > >     - Calcite grammar extension
> > >     - Planner-level LogicalErrorHandlingNode
> > >     - Runtime ErrorAwareOperator prototype
> > >     - Benchmark on Kafka ingestion workloads
> > >
> > > ------------------------------
> > > Closing
> > >
> > > I believe Flink SQL is missing a *first-class declarative error
> handling
> > > abstraction*, and this limits its usability in production data quality
> > > scenarios.
> > >
> > > This proposal is an attempt to explore whether we can bring
> > *DataStream-style
> > > side output semantics into SQL in a clean and composable way*.
> > >
> > > Feedback on feasibility, design boundaries, and alternative approaches
> is
> > > very welcome.
> > > ------------------------------
> > >
> >
> >
>

Reply via email to