Hi Flink devs,

I’d like to start a discussion on a potential enhancement to Flink SQL
around *declarative error handling and side-output semantics*.

This is an early-stage proposal and I’m mainly looking for feedback on
feasibility, scope, and design direction.
------------------------------
1. Background

In production streaming systems built on Flink SQL, data quality issues are
a persistent challenge, especially in:

   - Kafka ingestion pipelines
   - CDC streams
   - Log/event processing systems
   - Schema-evolving data sources
   - UDF-heavy transformation pipelines

Common failure modes include:

   - malformed JSON / Avro records
   - type mismatches during casting
   - schema drift (missing/extra fields)
   - runtime UDF exceptions
   - partial parsing failures

------------------------------
2. Current Limitations

Today, Flink SQL provides only partial solutions:

   - TRY_CAST / defensive expressions
   - json.ignore-parse-errors
   - null-based filtering
   - fallback to DataStream API

However, these approaches are fragmented and do not provide a unified
abstraction.
Key gaps: 2.1 No unified error handling model

Different failure modes require different mechanisms, with no consistent
cross-operator abstraction.
2.2 No SQL-level side output semantics

Unlike DataStream API (OutputTag), SQL cannot express:

route invalid records to a separate sink

2.3 No structured error observability

Even when errors are handled:

   - no error type
   - no raw record preservation
   - no column-level context
   - no standardized routing mechanism

------------------------------
3. Proposal: ON ERROR Clause

We propose introducing an optional SQL clause:

INSERT INTO clean_sink
SELECT *
FROM source_table
ON ERROR INTO dirty_sink;

------------------------------
4. Semantics (Initial Proposal)

The execution semantics are:

   - Records successfully processed → clean_sink
   - Records failing during:
      - parsing
      - expression evaluation
      - UDF execution
      - operator-level exceptions

→ routed to dirty_sink

Importantly:

   - main pipeline execution is NOT interrupted
   - error handling is non-fatal by default

------------------------------
5. Optional Extensions (For Discussion) 5.1 Error context propagation

ON ERROR INTO dirty_sink WITH CONTEXT;

This would allow enriching the dirty stream with:

   - raw record
   - error message
   - error type
   - failing field (if available)
   - timestamp

------------------------------
5.2 Error classification routing

ON ERROR INTO (
  parse_error_sink,
  type_error_sink,
  udf_error_sink
);

------------------------------
5.3 Retry / fallback semantics (future extension)

ON ERROR
  RETRY 3 TIMES
  BACKOFF 2s
  THEN INTO dirty_sink;

------------------------------
6. Execution Model

Conceptually, this introduces a *dual-output operator semantics*:

record
  |
  v
+------------------------+
| ErrorAwareOperator     |
| (try/catch boundary)   |
+------------------------+
     |              |
     |              |
     v              v
 clean stream   dirty stream

------------------------------
7. Relation to Existing Flink Concepts

This proposal is essentially a SQL-level abstraction over existing runtime
capability:

   - DataStream Side Output (OutputTag)
   - per-record try/catch execution
   - operator-level failure isolation

The key gap is *exposing this capability at SQL layer in a declarative form*
.
------------------------------
8. Design Considerations & Risks

I want to explicitly call out a few open concerns:
8.1 Performance overhead

Introducing per-record exception boundaries may introduce:

   - CPU overhead (try/catch)
   - potential JIT inhibition

We may need optimized execution strategies (e.g. selective instrumentation
or vectorized paths).
------------------------------
8.2 Semantics of exactly-once

Open question:

   - Should dirty stream be exactly-once, at-least-once, or best-effort?
   - Should it participate in checkpointing consistency?

------------------------------
8.3 Scope definition

We need to clarify whether this applies to:

   - INSERT INTO only (likely initial scope)
   - or SELECT / views / CTAS / materialized views

------------------------------
8.4 Connector interaction

Error classification may differ across:

   - Kafka / filesystem / CDC connectors
   - UDF-defined errors
   - format-level parsing errors

Standardization may be required.
------------------------------
9. Comparison with Existing Systems
System SQL-side error routing Unified model
Flink SQL (current) ❌ ❌
Spark SQL ❌ partial
Trino ❌ ❌
Flink DataStream ✔ ✔ (but not SQL exposed)
------------------------------
10. Motivation Summary

The goal is to bridge the gap between:

   - DataStream-level expressiveness (side outputs)
   - SQL-level declarative usability

This would enable:

   - cleaner production pipelines
   - better data quality isolation
   - improved observability
   - reduced need to drop into DataStream API for error handling

------------------------------
11. Open Questions for the Community

I’d appreciate feedback on:

   1. Is ON ERROR too broad for initial scope?
   2. Should this be limited to parsing-level errors first?
   3. What should the default delivery semantics of dirty streams be?
   4. Should error classification be standardized at Flink core level or
   connector level?
   5. Would this overlap too much with existing TRY_* patterns?
   6. Is Calcite the right place to extend this semantics?

------------------------------
12. Next Step (if direction is accepted)

If there is alignment, potential follow-ups could be:

   - Formal FLIP document (RFC stage)
   - Calcite grammar extension
   - Planner-level LogicalErrorHandlingNode
   - Runtime ErrorAwareOperator prototype
   - Benchmark on Kafka ingestion workloads

------------------------------
Closing

I believe Flink SQL is missing a *first-class declarative error handling
abstraction*, and this limits its usability in production data quality
scenarios.

This proposal is an attempt to explore whether we can bring *DataStream-style
side output semantics into SQL in a clean and composable way*.

Feedback on feasibility, design boundaries, and alternative approaches is
very welcome.
------------------------------

Reply via email to