weiqingy opened a new pull request, #28353:
URL: https://github.com/apache/flink/pull/28353

   ## What is the purpose of the change
   
   This pull request implements [FLIP-497: Early Fire Support for Flink SQL 
Interval 
Join](https://cwiki.apache.org/confluence/display/FLINK/FLIP-497%3A+Early+Fire+Support+for+Flink+SQL+Interval+Join).
   
   Today an outer interval join only emits an unmatched row, null-padded, once 
that row's time window has fully closed. For long windows this delays the 
null-padded result by the full window span even when a match will never arrive. 
FLIP-497 adds an `EARLY_FIRE` SQL join hint that lets an outer interval join 
emit the null-padded row speculatively after a configurable `delay`, then 
retract and correct it if a real match arrives later within the window. This 
turns the append-only interval-join result into an updating one, so the 
speculative latency is paid back as a correction rather than a wrong final 
answer.
   
   The hint is opt-in and scoped: it affects only outer joins 
(`LEFT`/`RIGHT`/`FULL`) with a non-negative window span. Inner joins and 
negative-window joins remain append-only and ignore the hint. Each unmatched 
outer row fires at most once — the hint is single-fire, not periodic.
   
   Example:
   
   ```sql
   SELECT /*+ EARLY_FIRE('delay'='5s') */ o.id, s.ship_time
   FROM Orders o
   LEFT OUTER JOIN Shipments s
   ON o.id = s.order_id
   AND o.order_time BETWEEN s.ship_time - INTERVAL '10' SECOND AND s.ship_time 
+ INTERVAL '1' HOUR
   ```
   
   When an order has no shipment yet, the join emits `+I[id, NULL]` once the 
delay elapses. If a matching shipment later arrives inside the window, the 
speculative row is corrected with `-U[id, NULL]` followed by `+U[id, 
ship_time]`.
   
   ## Brief change log
   
   The PR is organized as five sequential commits:
   
     - *Hint surface + planner plumbing (inert):* add 
`EarlyFireJoinHintOptions` (required `delay` duration, optional `time_mode` of 
`rowtime`/`proctime`), register the `EARLY_FIRE` hint in `FlinkHintStrategies` 
with a key-value option checker, and thread it through `JoinStrategy`, 
`CapitalizeQueryHintsShuttle`, `QueryHintsResolver`, the physical rule/node, 
and `StreamExecIntervalJoin`. The resolved delay and time mode are serialized 
as two additive `@JsonInclude(NON_NULL)` fields; the ExecNode metadata version 
is unchanged, so existing compiled plans restore as before. The operator 
ignores the fields at this stage.
     - *Changelog-mode inference:* split `StreamPhysicalIntervalJoin` into its 
own `ModifyKindSet` arm so it advertises `UPDATE` when the hint makes it 
update-producing (gated on hint set + outer join + non-negative window). An 
early-fire join feeding an insert-only downstream now fails planning with a 
tailored error that names the hint.
     - *Runtime early fire + retraction:* register an early-fire timer at 
`rowTime + delay` for each cached unmatched outer row; on the timer, emit the 
padded row as `+I` and record the fire in a new per-side `MapState<Long, 
List<Boolean>>` kept positionally aligned with the existing row cache (so the 
cache serializer is unchanged and old savepoints restore the new state empty). 
On a later match, retract with `-U` and emit the match as `+U`. Covers the 
natural pairings: row-time join fires on event time, processing-time join fires 
on processing time.
     - *Cross-domain processing-time fire on a row-time join:* support 
`EARLY_FIRE('time_mode'='proctime')` on an event-time interval join — 
speculative pads fire on the wall clock while event-time cleanup is retained. 
`onTimer` discriminates the two timer kinds via `OnTimerContext.timeDomain()`, 
and a per-side `MapState<Long, List<Long>>` maps each firing processing-time to 
the event-time bucket keys due then (allocated only in the cross-domain case). 
The planner's temporary "not yet supported" rejection is removed; 
`time_mode=rowtime` on a processing-time join stays rejected.
     - *Restore coverage + documentation:* add an `INTERVAL_JOIN_EARLY_FIRE` 
restore test program (event-time `LEFT OUTER` join against a changelog sink 
whose correction is only producible if the fired-bit state survives the 
savepoint), plus EN/ZH docs for the hint in the SQL joins reference.
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
     - Runtime harness tests in `RowTimeIntervalJoinTest` and 
`ProcTimeIntervalJoinTest` cover speculative emit, the `-U`/`+U` correction on 
a later match, single-fire when the delay is at or beyond the window span, the 
cross-domain wall-clock trigger without watermark advance, and snapshot/restore 
both before and after the pad is emitted.
     - Planner tests in `IntervalJoinTest` (Scala + `.xml`) cover hint 
parsing/validation, the resolved plan fields, the default `time_mode` 
resolution, and the `time_mode` error cases.
     - `IntervalJoinSpecJsonSerdeTest` confirms the additive JSON fields 
round-trip and that plans without them restore unchanged.
     - `IntervalJoinRestoreTest` runs the new `INTERVAL_JOIN_EARLY_FIRE` 
program end-to-end against a generated plan + savepoint fixture, exercising 
fired-bit state survival across restore.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): no
     - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: yes — `EarlyFireJoinHintOptions` is `@PublicEvolving` (the 
new user-facing hint surface from FLIP-497).
     - The serializers: no — the early-fired marker is held in a new separate 
`MapState` rather than widening the existing row-cache tuple, so the cache 
serializer is unchanged and old savepoints restore the new state empty. The two 
new ExecNode JSON fields are additive and `@JsonInclude(NON_NULL)`, leaving the 
metadata version unchanged.
     - The runtime per-record code paths (performance sensitive): yes — the 
interval-join operator's hot path is touched, but all early-fire work is gated 
on the hint being set, an outer join, and a non-negative window, so a plain 
interval join allocates nothing new and behaves as before.
     - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
     - The S3 file system connector: no
   
   ## Documentation
   
     - Does this pull request introduce a new feature? yes
     - If yes, how is the feature documented? docs (the SQL joins reference, EN 
+ ZH) and JavaDocs on the new option class.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to