Hi Xuyang,
Thanks a lot for the careful follow-ups — these are exactly the right
questions to ask before we move to a vote. After re-checking the code
paths you pointed at, I want to revise several statements I made in my
previous reply, and tighten the FLIP accordingly. I'll go through them
one by one.
1. Semantics at any position in the pipeline
=============================================
Fully agree. To make this concrete, I'll add a new sub-section
"Semantics at Different Pipeline Positions" to the FLIP, covering:
- APPLY_WATERMARK on a base table (with or without an existing
DDL watermark)
- APPLY_WATERMARK on top of a non-materialized view / sub-query
- APPLY_WATERMARK applied multiple times in the same query
- Interaction with TUMBLE / HOP / SESSION / CUMULATE
- Interaction with joins (regular / interval / temporal)
The mental model will be explicitly aligned with the DataStream API:
each APPLY_WATERMARK in SQL corresponds to one
`assignTimestampsAndWatermarks(...)` call in the DataStream pipeline,
applied in the order they appear.
2. Monotonicity validation
==========================
You're right, and I want to correct my earlier reply. Today's
`CREATE TABLE ... WATERMARK FOR ... AS ...` does NOT enforce
monotonicity at the planner level — the planner only checks that:
- the rowtime column exists and is of TIMESTAMP / TIMESTAMP_LTZ,
- the watermark expression is a valid scalar expression over the
table's schema and resolves to a comparable type.
Monotonicity is a runtime contract: the WatermarkAssignerOperator
emits watermarks that are guaranteed to be non-decreasing.
APPLY_WATERMARK will follow exactly the same contract — no stricter
planner-level monotonicity check. I'll update the FLIP's "Planner
Changes → Validation" section to reflect this.
3. Override timing — clarification
==================================
I think this was a wording issue on my side rather than a real
design disagreement. Let me restate it:
- APPLY_WATERMARK introduces a dedicated WatermarkAssigner node in
the plan. Whether the input already carries a watermark or not,
the plan ends up with the new assigner positioned downstream of
the existing one.
- At runtime there is no "merge" or "reconciliation": each
WatermarkAssigner operator independently emits its own watermark
stream; downstream operators simply observe the watermark from
the most recent upstream assigner.
This is the same model as calling `assignTimestampsAndWatermarks()`
twice in DataStream — the second call wins because it sits later in
the operator chain, not because of any planner-level magic.
So "planner-level override" was a poor choice of words. The correct
description is: **the planner decides the operator topology; the
runtime emits watermarks according to that topology**, exactly like
DataStream. I'll rephrase the FLIP accordingly and drop the
"override" terminology.
4. "Watermark expression evaluation: needs to support arbitrary
expressions"
====================================================================
Apologies, this statement was inaccurate. After re-checking,
StreamExecWatermarkAssigner already evaluates the watermark
expression through the standard `ExprCodeGenerator`, which supports
the same scalar expressions as DDL today (arithmetic on TIMESTAMP /
TIMESTAMP_LTZ, INTERVAL arithmetic, scalar UDFs, etc.).
What APPLY_WATERMARK actually needs from the ExecNode is:
- resolving the rowtime column index from the DESCRIPTOR, since the
input may be a non-base-table (view / sub-query / projected
relation),
- wiring the watermark expression's input row to the upstream
operator's output row instead of a TableScan output.
No new expression capability is required. I'll fix this in the FLIP.
5. State management
===================
You're right, this should be removed. In the scope of this FLIP the
APPLY_WATERMARK ExecNode is **stateless**, identical to the existing
`WatermarkAssignerOperator`. It does not buffer rows and does not
evict late data; late-data handling remains the responsibility of
the downstream window / join operators, exactly as it works today.
The "state management" bullet in my previous reply was speculation
about future watermark strategies (idle source detection, etc.) and
does not belong in this FLIP. I'll drop it.
------------------------------------------------------------
Updated summary of the design after this round
------------------------------------------------------------
- Scope: base tables, non-materialized views, sub-queries; any
relation position in the query.
- Semantics: aligned with DataStream API
`assignTimestampsAndWatermarks()`; multiple applications in the
same query are positional, not "overriding".
- Validation: same contract as today's DDL — scalar expression on
a TIMESTAMP / TIMESTAMP_LTZ rowtime column; no planner-level
monotonicity check.
- ExecNode: stateless, reuses StreamExecWatermarkAssigner with
minor wiring changes (rowtime column resolution + non-TableScan
input handling).
- Out of scope: WatermarkFunction interface, idle-source state,
runtime-level merge of multiple watermark strategies.
I'll update the FLIP document and PR #27984 to match the points
above, and post a diff summary here once it's done.
Thanks again for pushing on these — the FLIP is much cleaner after
this round.
Best regards,
FeatZhang
On Mon, May 11, 2026 at 10:45 AM Xuyang <[email protected]> wrote:
> Hi FeatZhang. Thanks for the detailed responses. I have a few follow-up
> comments and questions:
>
>
> 1. Support for APPLY_WATERMARK at any node/position
> I generally agree with the direction that APPLY_WATERMARK should be
> applicable at any node and any position — one or more times — similar to
> how the DataStream API allows watermark assignment. However, I think we
> need to clearly articulate the precise behavior/semantics in each scenario
> to reduce user confusion. Aligning the mental model with the DataStream API
> (where users can call assignTimestampsAndWatermarks() at arbitrary points
> in the pipeline) would also help lower the learning curve.
> 2. Monotonicity validation of watermark expressions
> Why do we need to enforce monotonicity guarantees on the watermark
> expression at the planner level? As far as I know, Flink SQL currently does
> NOT perform such validation at the DDL level for CREATE TABLE ... WATERMARK
> FOR ... AS .... What it actually does is ensure at runtime that emitted
> watermarks are non-decreasing. If the existing DDL path does not validate
> monotonicity at planning time, why should APPLY_WATERMARK introduce a
> stricter contract?
> 3. Planner-level watermark override
> Could you elaborate more on why the watermark override must happen at the
> planner level? If I understand correctly, in the DataStream API, users can
> define different watermark strategies at different nodes in the same
> pipeline, and the runtime handles watermark propagation naturally.
> 4. "Watermark expression evaluation: Needs to support arbitrary
> expressions"
> You mentioned that StreamExecWatermarkAssigner currently has limitations
> in watermark expression evaluation and "needs to support arbitrary
> expressions." Could you clarify what the current limitations are exactly?
> Today's CREATE TABLE ... WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
> already supports scalar expressions, what additional expression types does
> APPLY_WATERMARK require that are not already supported?
> 5. State management
> You mentioned "State management". Are we proposing to introduce a stateful
> watermark assigner node that evicts late data? This sounds like a
> significant change that goes well beyond the scope of this FLIP. The
> current WatermarkAssigner is stateless, it simply computes and emits
> watermarks without buffering data.
> Looking forward to your clarification!
>
>
>
>
>
> --
>
> Best!
> Xuyang
>
>
>
> At 2026-05-09 19:08:32, "熊饶饶" <[email protected]> wrote:
> >Hi Feat Zhang,
> >
> >+1 (non-binding)
> >
> >This is a well-motivated proposal that addresses a real pain point in
> Flink SQL. The inability to define watermarks in views currently forces
> users to either:
> >
> >Reference underlying table watermarks directly, breaking encapsulation
> >Create separate physical tables for each watermark strategy, leading to
> data duplication
> >The proposed syntax CREATE VIEW ... WATERMARK FOR col AS expr is
> intuitive and aligns naturally with existing DDL watermark semantics. The
> backward-compatible design (storing watermark metadata in catalog options)
> is also a smart choice — it avoids breaking existing views while enabling
> the new capability.
> >
> >The Data Lakehouse / medallion architecture use case is particularly
> compelling. Being able to define watermark strategies at the Silver/Gold
> layer while keeping Bronze as raw data would significantly simplify
> pipeline design for many teams.
> >
> >Looking forward to seeing this move to a formal FLIP!
> >
> >Best regards,
> >Raorao Xiong
> >
> >> 2026年5月7日 20:56,FeatZhang <[email protected]> 写道:
> >>
> >> Hi Xuyang,
> >>
> >> Thank you for the thorough review and thoughtful questions.
> >>
> >> *Problem this FLIP aims to solve*: The core goal of this FLIP is to
> allow
> >> watermarks to be defined on *computed columns (and, more generally, on
> any
> >> column produced inside a SQL query) directly in SQL statements*. Today,
> >> watermarks in Flink SQL can only be declared at the CREATE TABLE level
> >> via WATERMARK
> >> FOR ... AS ..., which means the time attribute must be a column visible
> at
> >> the base table DDL. This makes it impossible to attach a watermark to a
> >> timestamp derived inside a query — for example, one computed by string
> >> parsing, JSON extraction, or any expression inside a view or subquery —
> >> without pushing that computation back down into the source table DDL. By
> >> introducing APPLY_WATERMARK as an explicit relational operator that can
> be
> >> applied to *base tables, non-materialized views, and subqueries*, users
> can
> >> assign watermark semantics to any (computed) column produced by a query,
> >> which also addresses the broader motivations listed in the FLIP: broken
> >> layered-pipeline abstractions, lack of per-query / multi-tenant
> watermark
> >> strategies, and the current gap between SQL and the DataStream API.
> >>
> >> Now let me address each of your points:
> >> Support for Non-materialized Views
> >>
> >> You raised an excellent point about the scope of APPLY_WATERMARK in
> layered
> >> architectures.
> >>
> >> *My position*: APPLY_WATERMARK should support base tables,
> non-materialized
> >> views, *and subqueries* (this matches the Goals section of the FLIP).
> >> Here's why:
> >>
> >> - Non-materialized views dissolve into the surrounding plan during
> >> optimization (inline expansion)
> >> - There's no physical "view" node in the execution plan—just a logical
> >> alias
> >> - The watermark becomes a relational transformation applied on top of
> >> the view's / subquery's output
> >>
> >> The key design principle is: *watermark definition is an explicit
> >> relational operator, not attached metadata*.
> >>
> >> I'd also like to clarify the positions in the prior thread to avoid
> >> confusion:
> >>
> >> - *Lincoln* originally proposed APPLY_WATERMARK(table,
> DESCRIPTOR(col),
> >> expr) scoped to base tables only.
> >> - *Timo* raised the concern about blurring the view abstraction ("*A
> >> view usually dissolves into the plan … would a watermark definition
> >> suddenly introduce an optimization barrier? If this is an optimization
> >> barrier, is this still a view or a new concept?*"). This is exactly
> why
> >> this FLIP does *not* attach watermarks to views via CREATE VIEW /
> ALTER
> >> VIEW, and keeps views as pure logical aliases.
> >> - *Gyula* emphasized that watermark assignment should be available on
> >> views and subqueries too, consistent with the DataStream API.
> >>
> >> To address Timo's concern concretely:
> >>
> >> - Watermark semantics are applied at query planning time via an
> explicit
> >> relational operator (APPLY_WATERMARK), not hidden in view/catalog
> metadata.
> >> - No watermark information is persisted into the catalog for views —
> the
> >> catalog stays unchanged (see FLIP "Catalog Changes": *No catalog
> changes
> >> are required*).
> >> - Views continue to dissolve transparently into the plan; the
> >> optimization barrier only appears where APPLY_WATERMARK is explicitly
> used.
> >>
> >> Monotonicity Validation
> >>
> >> Great question! Monotonicity guarantees are essential for watermark
> >> correctness:
> >>
> >> - Watermarks define the boundary of "late" data
> >> - If the watermark expression is not monotonically non-decreasing, the
> >> watermark could move backward
> >> - This would cause data that was previously considered "on-time" to be
> >> treated as late (or vice-versa), breaking event-time semantics
> >>
> >> *Validation requirement*: In line with the FLIP's Planner Changes
> section,
> >> the planner validates that watermark_expression is a valid *scalar*
> expression
> >> over columns of the input schema, and (as with today's CREATE TABLE ...
> >> WATERMARK FOR ... AS ...) the expression must produce a monotonically
> >> non-decreasing value relative to the designated rowtime column.
> >>
> >> Typical valid forms are the same as what's allowed in DDL today, for
> >> example:
> >>
> >> -- Bounded out-of-orderness (most common)
> >> APPLY_WATERMARK(t, DESCRIPTOR(ts), ts - INTERVAL '5' SECOND)
> >> -- Strictly ascending
> >> APPLY_WATERMARK(t, DESCRIPTOR(ts), ts)
> >>
> >> Note: watermark_expression is a *scalar expression* per the FLIP (not an
> >> aggregate / window function). Richer forms such as user-defined
> watermark
> >> strategies are explicitly out of scope for this FLIP and are tracked as
> a
> >> future WatermarkFunction interface, which also depends on the Calcite
> >> lambda upgrade mentioned by Timo.
> >> Override Timing (Planner vs Runtime)
> >>
> >> You raised a valid concern. Let me clarify the design, which aligns with
> >> the FLIP's "Planner Changes → Interaction with Existing Table
> Watermarks"
> >> section:
> >>
> >> *Current proposal*: Planner-level override
> >>
> >> - During query compilation, when the input to APPLY_WATERMARK already
> >> carries a watermark (e.g., from CREATE TABLE ... WATERMARK), the
> >> LogicalWatermarkAssigner node produced by APPLY_WATERMARK *overrides*
> the
> >> upstream watermark strategy.
> >> - When the input has no watermark (e.g., a view or a subquery),
> >> APPLY_WATERMARK introduces a new one.
> >> - This makes override behavior explicit in the plan and keeps room for
> >> standard optimizations.
> >>
> >> *Why not runtime-level override*:
> >>
> >> 1. Planner-level override keeps watermark semantics a first-class,
> >> visible part of the plan (consistent with how VECTOR_SEARCH /
> ML_PREDICT
> >> are modeled as specialized ExecNodes in the FLIP).
> >> 2. The override point is deterministic and inspectable via EXPLAIN.
> >> 3. Simpler execution model — no dual-watermark reconciliation at
> runtime.
> >>
> >> If concrete use cases for a runtime-level override surface later, we
> could
> >> revisit this via a hint (e.g., /*+ RUNTIME_OVERRIDE */), but it's not
> part
> >> of this FLIP.
> >> Relationship with BuiltInProcessTableFunction
> >>
> >> Good observation. APPLY_WATERMARK is declared as a built-in PTF at the
> SQL
> >> surface, similar in spirit to TO_CHANGELOG / FROM_CHANGELOG, but the
> FLIP
> >> intentionally maps it to a *specialized ExecNode* rather than a generic
> PTF
> >> runtime — the same pattern used by VECTOR_SEARCH and ML_PREDICT.
> >>
> >> *Option A: Reuse the generic BuiltInProcessTableFunction runtime*
> >>
> >> - Pros: Consistent with other built-in PTFs at the runtime layer.
> >> - Cons: Watermark assignment is not a row-transforming PTF — it
> changes
> >> stream metadata (time attribute + watermark strategy). Forcing it
> through
> >> the generic PTF runtime would require extending the PTF contract with
> >> watermark semantics.
> >>
> >> *Option B: Dedicated LogicalWatermarkAssigner + specialized ExecNode
> (the
> >> FLIP's choice)*
> >>
> >> - Pros: Keeps watermark semantics a first-class citizen in the
> planner;
> >> cleanly integrates with watermark propagation rules; no need to
> overload
> >> the PTF contract; same pattern as VECTOR_SEARCH / ML_PREDICT already
> >> established in Flink.
> >> - Cons: A new dedicated node, though that cost is small compared to
> the
> >> semantic clarity.
> >>
> >> *Current decision*: Option B, as stated in the FLIP ("*APPLY_WATERMARK
> >> compiles to a specialized ExecNode --- similar to how VECTOR_SEARCH and
> >> ML_PREDICT are handled*"). Open to revisiting based on community
> feedback.
> >> StreamExecWatermarkAssigner Sufficiency
> >>
> >> For the physical implementation:
> >>
> >> *Yes, StreamExecWatermarkAssigner should be sufficient*, with some
> >> modifications:
> >>
> >> 1. *Input handling*: Currently assumes direct table scan; needs to
> >> handle APPLY_WATERMARK's column mapping
> >> 2. *Watermark expression evaluation*: Needs to support arbitrary
> >> expressions (currently limited)
> >> 3. *State management*: May need additional state for handling
> >> out-of-order events
> >>
> >> The key insight is that APPLY_WATERMARK conceptually translates to:
> >>
> >> TableScan -> Calc (expression evaluation) -> WatermarkAssigner
> >>
> >> StreamExecWatermarkAssigner handles the last step; the Calc step handles
> >> the expression.
> >> ------------------------------
> >> Summary of Proposed Responses
> >>
> >> - *Scope of input*: Support base tables, non-materialized views *and
> >> subqueries* (per FLIP Goals); views/catalog semantics stay unchanged.
> >> - *Monotonicity*: Validate watermark_expression as a scalar
> expression;
> >> same monotonicity contract as today's DDL watermarks.
> >> - *Override timing*: Planner-level override at
> LogicalWatermarkAssigner;
> >> potential /*+ RUNTIME_OVERRIDE */ hint as future work.
> >> - *PTF reuse*: Dedicated LogicalWatermarkAssigner + specialized
> ExecNode
> >> (same pattern as VECTOR_SEARCH / ML_PREDICT).
> >> - *ExecNode sufficiency*: StreamExecWatermarkAssigner is sufficient
> with
> >> minor modifications (input handling + expression evaluation).
> >>
> >> ------------------------------
> >>
> >> Looking forward to further discussion!
> >>
> >> Best regards,
> >> FeatZhang
> >>
> >>
> >> On Wed, May 6, 2026 at 4:39 PM Xuyang <[email protected]> wrote:
> >>
> >>> Hi, FeatZhang. Thanks for driving this discussion. I've read through
> the
> >>> full FLIP and the mailing list context, and I have a few questions:
> >>> 1. If I understand correctly, in a Layered Data Architecture,
> >>> silver_events would typically be a table, a materialized view, or a
> >>> materialized table. From the mailing list discussion, it seems like no
> >>> consensus was reached on this point. I think we still need to consider
> >>> whether APPLY_WATERMARK should be allowed on (non-materialized) views.
> >>> 2. In the Planner Changes section under Logical Plan, could you
> elaborate
> >>> on why monotonicity guarantees need to be ensured for the watermark
> >>> expression validation?
> >>> 3. (nit) In the Watermark Override part under Planner Changes,
> shouldn't
> >>> the override of the upstream watermark happen at runtime rather than
> at the
> >>> planner level?
> >>> 4. I feel that APPLY_WATERMARK is quite similar to TO_CHANGELOG and
> >>> FROM_CHANGELOG. Is what we actually need just a
> >>> BuiltInProcessTableFunction? That way, we would only need to further
> extend
> >>> ProcessTableFunction to support this.
> >>> 5. If we choose to translate APPLY_WATERMARK into a specialized
> ExecNode
> >>> (similar to VECTOR_SEARCH and ML_PREDICT), would the existing
> >>> StreamExecWatermarkAssigner be sufficient for this purpose?
> >>>
> >>>
> >>>
> >>>
> >>>
> >>> --
> >>>
> >>> Best!
> >>> Xuyang
> >>>
> >>>
> >>>
> >>> At 2026-04-21 22:22:33, "FeatZhang" <[email protected]> wrote:
> >>>> Hi everyone,
> >>>>
> >>>> Thank you for the feedback and discussions on the initial proposal.
> I've
> >>>> revised the FLIP based on the community's input and would like to
> share
> >>>> the updated version.
> >>>>
> >>>>
> >>>> FLIP-XXX: Support Flexible Watermark Assignment via Built-in Function
> >>>> <
> >>>
> https://drive.google.com/open?id=17PXYAi6Pb91OqFhVVK7tRULiaHAb6wiX79jjC0NaaDA
> >>>>
> >>>>
> >>>>
> >>>> KEY UPDATES
> >>>> ===========
> >>>>
> >>>> The proposal has evolved from "Support Watermark Definition in SQL
> Views"
> >>>> to a more flexible and powerful approach: FLIP-XXX: Support Flexible
> >>>> Watermark Assignment via APPLY_WATERMARK Function.
> >>>>
> >>>> What's Changed:
> >>>>
> >>>> 1. Broader Scope: Instead of limiting watermark definitions to SQL
> views
> >>>> only, the new proposal introduces a built-in table function
> >>>> APPLY_WATERMARK that works with:
> >>>> - Base tables
> >>>> - Views (both regular and materialized)
> >>>> - Subqueries
> >>>> - Any table-valued expressions
> >>>>
> >>>> 2. More Flexible Design: The function-based approach provides:
> >>>> - Dynamic watermark assignment at query time without modifying
> catalog
> >>>> metadata
> >>>> - Override capability for existing watermark strategies
> >>>> - Composability with other SQL operations
> >>>> - No need for DDL changes or catalog write permissions
> >>>>
> >>>> 3. Better SQL Semantics: Using a table function follows SQL standard
> >>>> patterns and integrates naturally with Flink's existing function
> >>>> ecosystem.
> >>>>
> >>>> UPDATED FLIP DOCUMENT
> >>>> =====================
> >>>>
> >>>> The revised FLIP is now available at:
> >>>> https://iwiki.woa.com/p/4019879693
> >>>>
> >>>> Key sections include:
> >>>> - Motivation and use cases
> >>>> - Public interfaces and SQL syntax
> >>>> - Implementation plan
> >>>> - Compatibility analysis
> >>>> - Test plan
> >>>>
> >>>> EXAMPLE USAGE
> >>>> =============
> >>>>
> >>>> -- Apply watermark to a view
> >>>> SELECT *FROM APPLY_WATERMARK(my_view, DESCRIPTOR(event_time),
> >>>> event_time - INTERVAL '5' SECOND);
> >>>> -- Override existing watermark strategy
> >>>> SELECT *FROM APPLY_WATERMARK(my_table_with_watermark, DESCRIPTOR(ts),
> >>>> ts - INTERVAL '10' SECOND -- Different from DDL watermark
> >>>> );
> >>>> -- Use in complex queries
> >>>> SELECT window_start,
> >>>> window_end,
> >>>> COUNT(*)FROM TABLE(TUMBLE(TABLE APPLY_WATERMARK(orders,
> >>>> DESCRIPTOR(order_time), order_time - INTERVAL '5' SECOND),
> >>>> DESCRIPTOR(order_time), INTERVAL '1' HOUR))GROUP BY window_start,
> >>>> window_end;
> >>>>
> >>>>
> >>>> IMPLEMENTATION PROGRESS
> >>>> =======================
> >>>>
> >>>> I've also opened a draft PR #27984 with the initial implementation:
> >>>> - Core built-in function definition
> >>>> - SQL-to-RelNode conversion rules
> >>>> - Physical plan integration
> >>>> - Unit tests and documentation (English + Chinese)
> >>>>
> >>>> The PR is available at:
> >>>> https://github.com/apache/flink/pull/27984
> >>>>
> >>>> REQUEST FOR FEEDBACK
> >>>> ====================
> >>>>
> >>>> I would appreciate your thoughts on:
> >>>>
> >>>> 1. Function naming: Is APPLY_WATERMARK clear and intuitive?
> >>>> (Alternative considered: WITH_WATERMARK, SET_WATERMARK)
> >>>>
> >>>> 2. DESCRIPTOR syntax: Using DESCRIPTOR(column_name) to specify the
> >>>> rowtime column—does this align well with Flink's existing patterns?
> >>>>
> >>>> 3. Override behavior: Should APPLY_WATERMARK always override existing
> >>>> watermarks, or should we provide a mode parameter
> >>>> (e.g., OVERRIDE, MERGE)?
> >>>>
> >>>> 4. Performance considerations: Any concerns about the function-based
> >>>> approach vs. catalog-level watermark definitions?
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>> Looking forward to your valuable feedback!
> >>>>
> >>>> Best regards,
> >>>> FeatZhang
> >>>>
> >>>> On Thu, Feb 12, 2026 at 6:24 PM Timo Walther <[email protected]>
> wrote:
> >>>>
> >>>>> Hi everyone,
> >>>>>
> >>>>> I think we all agree that we clearly want this functionality, just
> the
> >>>>> "how" needs to be discussed. I also like Lincoln’s suggestion of
> >>>>> introducing a built-in PTF for this, I had similar ideas in mind.
> >>>>>
> >>>>> There are two issues with a APPLY_WATERMARK function, but both on the
> >>>>> short-term roadmap:
> >>>>>
> >>>>> 1) This function would need to be a function that takes an
> expression.
> >>>>> Ideally as a lambda function. Newer Calcite versions have already
> lambda
> >>>>> expression support. At Confluent we were planning to work on a
> Calcite
> >>>>> upgrade this quarter especially to get lambda support in and improve
> >>>>> built-in functions that work on collections.
> >>>>>
> >>>>> 2) User-defined PTFs are currently not able to emit watermarks. We
> could
> >>>>> introduce a new interface WatermarkFunction (similar to
> >>>>> ChangelogFunction) that would offer this to everyone. Alternatively,
> we
> >>>>> could only use the PTF signature, but translate to a specialized
> >>>>> ExecNode similar how we do it for VECTOR_SEARCH and ML_PREDICT.
> >>>>>
> >>>>> In any case, even if we go with the function approach, we definitely
> >>>>> need a full FLIP on this.
> >>>>>
> >>>>> Thanks,
> >>>>> Timo
> >>>>>
> >>>>> On 12.02.26 08:25, Gyula Fóra wrote:
> >>>>>> Hi All!
> >>>>>> I would like to chime in here quickly from a slightly different
> angle.
> >>>>>> While I am the first to admit that I cannot grasp all the planning /
> >>>>>> conceptual implications, I also feel the need for more flexible
> >>> watermark
> >>>>>> handling as suggested by Feat.
> >>>>>>
> >>>>>> Anything that can only be applied to base/catalog tables is very
> >>> limiting
> >>>>>> from a usability perspective. Watermarks feel like they should be a
> >>>>> simple
> >>>>>> function that you can apply on a column/table as part of a
> query/view.
> >>>>> For
> >>>>>> example extract timestamp from a string convert to TS -> apply
> >>> watermark
> >>>>>> etc.
> >>>>>>
> >>>>>> Users often receive the tables/catalogs as given and can only write
> >>>>>> queries.
> >>>>>>
> >>>>>> Fixing this would eliminate a long standing disconnect between the
> >>>>>> datastream api flexible watermark handling compared to the currently
> >>> very
> >>>>>> restrictive SQL approach.
> >>>>>>
> >>>>>> Cheers
> >>>>>> Gyula
> >>>>>>
> >>>>>> On Thu, Feb 12, 2026 at 7:54 AM FeatZhang <[email protected]>
> >>> wrote:
> >>>>>>
> >>>>>>> Hi Timo, Lincoln,
> >>>>>>>
> >>>>>>> Thank you both for the detailed feedback.
> >>>>>>>
> >>>>>>> I agree with the concern that non-materialized SQL views should
> >>> remain a
> >>>>>>> pure logical abstraction. Introducing watermark definitions
> directly
> >>>>>>> into CREATE
> >>>>>>> VIEW or ALTER VIEW could blur the boundary between logical aliasing
> >>> and
> >>>>>>> physical planning semantics, especially considering optimization
> >>>>> barriers
> >>>>>>> and watermark propagation behavior.
> >>>>>>>
> >>>>>>> Lincoln’s suggestion of introducing a built-in function such as:
> >>>>>>>
> >>>>>>> APPLY_WATERMARK(table, DESCRIPTOR(rowtime_column),
> >>> watermark_expression)
> >>>>>>>
> >>>>>>> is a cleaner direction. It keeps watermark definition as an
> explicit
> >>>>>>> relational transformation rather than attaching additional
> semantics
> >>> to
> >>>>>>> views.
> >>>>>>>
> >>>>>>> However, to fully address the original use cases (especially
> logical
> >>>>> reuse
> >>>>>>> and layered lakehouse architectures), I propose that the table
> >>> parameter
> >>>>>>> should support:
> >>>>>>>
> >>>>>>> - Base tables
> >>>>>>> - Non-materialized views
> >>>>>>>
> >>>>>>> If APPLY_WATERMARK can accept both, we can:
> >>>>>>>
> >>>>>>> - Preserve the conceptual purity of SQL views
> >>>>>>> - Avoid redefining view semantics
> >>>>>>> - Still enable logical reuse via views
> >>>>>>> - Allow different watermark strategies over the same logical
> >>>>> relation
> >>>>>>>
> >>>>>>> In other words, watermark definition becomes an explicit relational
> >>>>>>> operator applied on top of any logical relation, instead of being
> >>>>> embedded
> >>>>>>> into the view definition itself.
> >>>>>>>
> >>>>>>> From a planner perspective, this keeps the model consistent:
> >>>>>>>
> >>>>>>> - The function expands into a relational node
> >>>>>>> - No optimization barrier is introduced by views
> >>>>>>> - Watermark handling remains part of the logical plan
> >>> transformation
> >>>>>>>
> >>>>>>> I will prepare a PR to prototype APPLY_WATERMARK with support for
> >>> both
> >>>>> base
> >>>>>>> tables and non-materialized views, and share it for further
> >>> discussion.
> >>>>>>>
> >>>>>>> Looking forward to your thoughts.
> >>>>>>>
> >>>>>>> Best,
> >>>>>>> Feat
> >>>>>>>
> >>>>>>>
> >>>>>>> Lincoln Lee <[email protected]> 于2026年2月12日周四 12:19写道:
> >>>>>>>
> >>>>>>>> Agree with Timo’s point regarding the conceptual semantics. We
> >>> should
> >>>>> not
> >>>>>>>> directly extend non-materialized views with additional watermark
> >>>>>>>> definitions.
> >>>>>>>>
> >>>>>>>> Regarding the use case mentioned by Feat, defining different
> >>> watermark
> >>>>>>>> strategies
> >>>>>>>> for the same data source, especially in the case of catalog
> tables,
> >>> we
> >>>>>>> are
> >>>>>>>> exploring a possible solution introducing a built-in function:
> >>>>>>>> ```sql
> >>>>>>>> APPLY_WATERMARK(table, DESCRIPTOR(rowtime_column),
> >>>>> watermark_expression)
> >>>>>>>> ```
> >>>>>>>> This function only support base tables as input and not support
> >>> views,
> >>>>>>>> subqueries
> >>>>>>>> or derived relations.
> >>>>>>>>
> >>>>>>>> This would address a meaningful subset of the identified use cases
> >>>>>>> without
> >>>>>>>> redefining the role of SQL views.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> Best,
> >>>>>>>> Lincoln Lee
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> Timo Walther <[email protected]> 于2026年2月11日周三 23:29写道:
> >>>>>>>>
> >>>>>>>>> Hi Feat,
> >>>>>>>>>
> >>>>>>>>> thanks for proposing this FLIP. We had similar discussions in the
> >>>>> past,
> >>>>>>>>> but so far could never reach consensus.
> >>>>>>>>>
> >>>>>>>>> SQL views are actually a very simple concept, they just give SQL
> >>> text
> >>>>>>> an
> >>>>>>>>> alias. A view has no other properties except for the view
> >>> definition.
> >>>>>>>>> Everything else is dynamically computed when the SQL text is
> >>> inserted
> >>>>>>>>> into the larger plan. A view is never evaluated without the
> >>>>> surrounding
> >>>>>>>>> plan.
> >>>>>>>>>
> >>>>>>>>> Watermarks in the middle of a pipeline raise a couple of tricky
> >>>>> issues:
> >>>>>>>>>
> >>>>>>>>> - What if the upstream table is updating, how would you deal with
> >>>>>>>>> watermarks in the downstream view?
> >>>>>>>>> - What if the upstream table emits already watermarks? Would the
> >>> view
> >>>>>>>>> catch them and discard this information?
> >>>>>>>>> - A view usually dissolves into the plan (e.g. via projection or
> >>>>> filter
> >>>>>>>>> pushdown). Would a watermark definition suddenly introduce an
> >>>>>>>>> optimization barrier? If this is an optimization barrier, is this
> >>>>> still
> >>>>>>>>> a view or a new concept? E.g. a "materialized view" or
> "pre-planned
> >>>>>>>> view"?
> >>>>>>>>>
> >>>>>>>>> Cheers,
> >>>>>>>>> Timo
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On 11.02.26 03:51, FeatZhang wrote:
> >>>>>>>>>> Hi Flink Community,
> >>>>>>>>>>
> >>>>>>>>>> I'd like to propose adding watermark support for SQL Views in
> >>> Flink
> >>>>>>> to
> >>>>>>>>>> better support event-time processing scenarios.
> >>>>>>>>>> Problem Statement
> >>>>>>>>>>
> >>>>>>>>>> Currently, Flink SQL views cannot define watermarks. This
> >>> limitation
> >>>>>>>>>> creates several challenges:
> >>>>>>>>>>
> >>>>>>>>>> - *Broken Abstraction*: Users must reference underlying
> table
> >>>>>>>>> watermarks
> >>>>>>>>>> directly, exposing implementation details
> >>>>>>>>>> - *No Flexibility*: Cannot define different watermark
> >>> strategies
> >>>>>>>> for
> >>>>>>>>>> different use cases on the same data source
> >>>>>>>>>> - *Limited Architecture Support*: Incompatible with modern
> >>>>>>> layered
> >>>>>>>>> data
> >>>>>>>>>> architectures (Bronze/Silver/Gold medallion pattern)
> >>>>>>>>>>
> >>>>>>>>>> For example, in multi-tenant scenarios, different tenants may
> >>> require
> >>>>>>>>>> different lateness tolerance, but currently we cannot create
> views
> >>>>>>> with
> >>>>>>>>>> different watermark strategies on the same source table.
> >>>>>>>>>> Proposed Solution
> >>>>>>>>>>
> >>>>>>>>>> I propose adding two SQL syntax options to support watermark
> >>>>>>>> definitions
> >>>>>>>>> in
> >>>>>>>>>> views:
> >>>>>>>>>>
> >>>>>>>>>> *Option 1: CREATE VIEW with WATERMARK*
> >>>>>>>>>>
> >>>>>>>>>> CREATE VIEW user_activity
> >>>>>>>>>> WATERMARK FOR event_time AS event_time - INTERVAL '5' SECONDAS
> >>> SELECT
> >>>>>>>>>> user_id, event_time, action FROM raw_events;
> >>>>>>>>>>
> >>>>>>>>>> *Option 2: ALTER VIEW SET WATERMARK*
> >>>>>>>>>>
> >>>>>>>>>> ALTER VIEW user_activity SET WATERMARK FOR event_time AS
> >>> event_time -
> >>>>>>>>>> INTERVAL '5' SECOND;
> >>>>>>>>>>
> >>>>>>>>>> Key Design Aspects
> >>>>>>>>>>
> >>>>>>>>>> - *Backward Compatibility*: Watermark stored as optional
> >>>>> metadata
> >>>>>>>> in
> >>>>>>>>>> view options; existing views continue to work unchanged
> >>>>>>>>>> - *Validation*: Watermark column must exist in view schema
> >>> and
> >>>>> be
> >>>>>>>> of
> >>>>>>>>>> TIMESTAMP/TIMESTAMP_LTZ type
> >>>>>>>>>> - *Storage*: Watermark metadata persists in catalog options
> >>> map
> >>>>>>>>> (works
> >>>>>>>>>> with all catalog implementations)
> >>>>>>>>>> - *Propagation*: Follows existing Flink watermark
> propagation
> >>>>>>> rules
> >>>>>>>>> in
> >>>>>>>>>> joins and nested views
> >>>>>>>>>>
> >>>>>>>>>> Use Case Example: Data Lakehouse Architecture
> >>>>>>>>>>
> >>>>>>>>>> -- Bronze: Raw data (no watermark)CREATE TABLE bronze_events
> >>>>>>> (raw_data
> >>>>>>>>>> STRING, ingestion_time TIMESTAMP(3)) WITH (...);
> >>>>>>>>>> -- Silver: Cleaned data with watermarkCREATE VIEW silver_events
> >>>>>>>>>> WATERMARK FOR event_time AS event_time - INTERVAL '10' SECONDAS
> >>>>>>> SELECT
> >>>>>>>>>> CAST(JSON_VALUE(raw_data, '$.event_time') AS TIMESTAMP(3))
> >>> AS
> >>>>>>>>> event_time,
> >>>>>>>>>> JSON_VALUE(raw_data, '$.user_id') AS user_idFROM
> >>>>>>>>>> bronze_eventsWHERE JSON_VALUE(raw_data, '$.event_time') IS NOT
> >>> NULL;
> >>>>>>>>>> -- Gold: AggregationsSELECT TUMBLE_START(event_time, INTERVAL
> '1'
> >>>>>>>>>> HOUR), COUNT(*)FROM silver_eventsGROUP BY TUMBLE(event_time,
> >>> INTERVAL
> >>>>>>>>>> '1' HOUR);
> >>>>>>>>>>
> >>>>>>>>>> Reference Materials
> >>>>>>>>>>
> >>>>>>>>>> - FLIP Document: FLIP-XXX: Support Watermark in Flink SQL
> >>> View
> >>>>>>>>>> <
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>
> >>>
> https://docs.google.com/document/d/1OBGTi3Xb-Kpcf_nHeKA30XiRPEKV4PU7FpieXQhWn7Y/edit?usp=sharing
> >>>>>>>>>>
> >>>>>>>>>> - JIRA Issue:
> >>> https://issues.apache.org/jira/browse/FLINK-39062
> >>>>>>>>>> - Implementation POC:
> >>>>>>>>>> - [FLINK-39062][table] Support WATERMARK clause in CREATE
> >>> VIEW
> >>>>>>>>> statement
> >>>>>>>>>> <https://github.com/apache/flink/pull/27571>
> >>>>>>>>>> - [FLINK-39062][table] Support ALTER VIEW SET WATERMARK
> >>> syntax
> >>>>>>>>>> <https://github.com/apache/flink/pull/27570>
> >>>>>>>>>>
> >>>>>>>>>> Implementation Timeline
> >>>>>>>>>>
> >>>>>>>>>> Estimated 6-8 weeks covering parser layer, planner layer,
> catalog
> >>>>>>>>>> integration, and comprehensive testing.
> >>>>>>>>>> Request for Feedback
> >>>>>>>>>>
> >>>>>>>>>> This enhancement would significantly improve Flink's support for
> >>>>>>>> layered
> >>>>>>>>>> data architectures and flexible event-time processing. I'm happy
> >>> to
> >>>>>>>>> provide
> >>>>>>>>>> more details or start a formal FLIP process if the community
> sees
> >>>>>>> value
> >>>>>>>>> in
> >>>>>>>>>> this proposal.
> >>>>>>>>>>
> >>>>>>>>>> Looking forward to the community's feedback!
> >>>>>>>>>>
> >>>>>>>>>> Best regards,
> >>>>>>>>>>
> >>>>>>>>>> Feat Zhang
> >>>>>>>>>>
> >>>>>>>>>> FLIP-XXX: Support Watermark in Flink SQL View
> >>>>>>>>>> <
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>
> >>>
> https://drive.google.com/open?id=1OBGTi3Xb-Kpcf_nHeKA30XiRPEKV4PU7FpieXQhWn7Y
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>>
> >>>
>