featzhang opened a new pull request, #27984:
URL: https://github.com/apache/flink/pull/27984
## What is the purpose of the change
This PR implements the `APPLY_WATERMARK` built-in function as proposed in
the community discussion thread, enabling flexible watermark assignment on
tables, views, and subqueries in Flink SQL.
**Motivation:**
Currently, Flink SQL requires watermarks to be defined in DDL (`CREATE TABLE
... WITH WATERMARK FOR ...`), which limits flexibility when:
- Working with catalog tables without DDL modification permissions
- Using views or complex subqueries where DDL is not applicable
- Dynamically adjusting watermark strategies without schema changes
**Solution:**
Introduce `APPLY_WATERMARK(table, DESCRIPTOR(rowtime_column),
watermark_expr)` function that:
- Accepts any table expression (base table, view, or subquery)
- Assigns or overrides watermark on the specified rowtime column
- Validates column existence and TIMESTAMP/TIMESTAMP_LTZ types at compile
time
## Brief change log
- Add `SqlApplyWatermarkFunction` as a new built-in SQL function
- Add `LogicalApplyWatermarkRule` to convert SQL function calls to logical
plan nodes
- Extend `FlinkLogicalWatermarkAssigner` to support SQL function path
- Update `StreamPhysicalWatermarkAssigner` to integrate with existing
watermark infrastructure
- Add unit tests for function registration and validation
## Verifying this change
This change added tests and can be verified as follows:
- Added `ApplyWatermarkFunctionTest` for function registration and operand
validation
- Existing watermark-related tests still pass (DDL-based watermarks remain
unchanged)
- Manual verification with example queries (see below)
## Example Usage
```sql
-- Apply watermark to a catalog table
SELECT * FROM APPLY_WATERMARK(
orders,
DESCRIPTOR(order_time),
order_time - INTERVAL '5' SECOND
);
-- Override watermark on a view
CREATE VIEW recent_orders AS SELECT * FROM orders WHERE order_time >
CURRENT_TIMESTAMP - INTERVAL '1' DAY;
SELECT * FROM APPLY_WATERMARK(
recent_orders,
DESCRIPTOR(order_time),
order_time - INTERVAL '10' SECOND
);
-- Use with subquery
SELECT * FROM APPLY_WATERMARK(
(SELECT * FROM orders WHERE amount > 100),
DESCRIPTOR(order_time),
order_time - INTERVAL '3' SECOND
);
```
## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): **no**
- The public API, i.e., is any changed class annotated with
`@Public(Evolving)`: **no** (internal planner API only)
- The serializers: **no**
- The runtime per-record code paths (performance sensitive): **no**
- Anything that affects deployment or recovery: JobManager (and its
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: **no**
- The S3 file system connector: **no**
## Documentation
- Does this pull request introduce a new feature? **yes**
- If yes, how is the feature documented? **not documented yet** (will add
docs in follow-up PR after initial review)
## Discussion Thread
https://lists.apache.org/thread/oonylk4h8dnsom40g8rr5k52zf3tz64v
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]