weiqingy opened a new pull request, #28353: URL: https://github.com/apache/flink/pull/28353
## What is the purpose of the change This pull request implements [FLIP-497: Early Fire Support for Flink SQL Interval Join](https://cwiki.apache.org/confluence/display/FLINK/FLIP-497%3A+Early+Fire+Support+for+Flink+SQL+Interval+Join). Today an outer interval join only emits an unmatched row, null-padded, once that row's time window has fully closed. For long windows this delays the null-padded result by the full window span even when a match will never arrive. FLIP-497 adds an `EARLY_FIRE` SQL join hint that lets an outer interval join emit the null-padded row speculatively after a configurable `delay`, then retract and correct it if a real match arrives later within the window. This turns the append-only interval-join result into an updating one, so the speculative latency is paid back as a correction rather than a wrong final answer. The hint is opt-in and scoped: it affects only outer joins (`LEFT`/`RIGHT`/`FULL`) with a non-negative window span. Inner joins and negative-window joins remain append-only and ignore the hint. Each unmatched outer row fires at most once — the hint is single-fire, not periodic. Example: ```sql SELECT /*+ EARLY_FIRE('delay'='5s') */ o.id, s.ship_time FROM Orders o LEFT OUTER JOIN Shipments s ON o.id = s.order_id AND o.order_time BETWEEN s.ship_time - INTERVAL '10' SECOND AND s.ship_time + INTERVAL '1' HOUR ``` When an order has no shipment yet, the join emits `+I[id, NULL]` once the delay elapses. If a matching shipment later arrives inside the window, the speculative row is corrected with `-U[id, NULL]` followed by `+U[id, ship_time]`. ## Brief change log The PR is organized as five sequential commits: - *Hint surface + planner plumbing (inert):* add `EarlyFireJoinHintOptions` (required `delay` duration, optional `time_mode` of `rowtime`/`proctime`), register the `EARLY_FIRE` hint in `FlinkHintStrategies` with a key-value option checker, and thread it through `JoinStrategy`, `CapitalizeQueryHintsShuttle`, `QueryHintsResolver`, the physical rule/node, and `StreamExecIntervalJoin`. The resolved delay and time mode are serialized as two additive `@JsonInclude(NON_NULL)` fields; the ExecNode metadata version is unchanged, so existing compiled plans restore as before. The operator ignores the fields at this stage. - *Changelog-mode inference:* split `StreamPhysicalIntervalJoin` into its own `ModifyKindSet` arm so it advertises `UPDATE` when the hint makes it update-producing (gated on hint set + outer join + non-negative window). An early-fire join feeding an insert-only downstream now fails planning with a tailored error that names the hint. - *Runtime early fire + retraction:* register an early-fire timer at `rowTime + delay` for each cached unmatched outer row; on the timer, emit the padded row as `+I` and record the fire in a new per-side `MapState<Long, List<Boolean>>` kept positionally aligned with the existing row cache (so the cache serializer is unchanged and old savepoints restore the new state empty). On a later match, retract with `-U` and emit the match as `+U`. Covers the natural pairings: row-time join fires on event time, processing-time join fires on processing time. - *Cross-domain processing-time fire on a row-time join:* support `EARLY_FIRE('time_mode'='proctime')` on an event-time interval join — speculative pads fire on the wall clock while event-time cleanup is retained. `onTimer` discriminates the two timer kinds via `OnTimerContext.timeDomain()`, and a per-side `MapState<Long, List<Long>>` maps each firing processing-time to the event-time bucket keys due then (allocated only in the cross-domain case). The planner's temporary "not yet supported" rejection is removed; `time_mode=rowtime` on a processing-time join stays rejected. - *Restore coverage + documentation:* add an `INTERVAL_JOIN_EARLY_FIRE` restore test program (event-time `LEFT OUTER` join against a changelog sink whose correction is only producible if the fired-bit state survives the savepoint), plus EN/ZH docs for the hint in the SQL joins reference. ## Verifying this change This change added tests and can be verified as follows: - Runtime harness tests in `RowTimeIntervalJoinTest` and `ProcTimeIntervalJoinTest` cover speculative emit, the `-U`/`+U` correction on a later match, single-fire when the delay is at or beyond the window span, the cross-domain wall-clock trigger without watermark advance, and snapshot/restore both before and after the pad is emitted. - Planner tests in `IntervalJoinTest` (Scala + `.xml`) cover hint parsing/validation, the resolved plan fields, the default `time_mode` resolution, and the `time_mode` error cases. - `IntervalJoinSpecJsonSerdeTest` confirms the additive JSON fields round-trip and that plans without them restore unchanged. - `IntervalJoinRestoreTest` runs the new `INTERVAL_JOIN_EARLY_FIRE` program end-to-end against a generated plan + savepoint fixture, exercising fired-bit state survival across restore. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: yes — `EarlyFireJoinHintOptions` is `@PublicEvolving` (the new user-facing hint surface from FLIP-497). - The serializers: no — the early-fired marker is held in a new separate `MapState` rather than widening the existing row-cache tuple, so the cache serializer is unchanged and old savepoints restore the new state empty. The two new ExecNode JSON fields are additive and `@JsonInclude(NON_NULL)`, leaving the metadata version unchanged. - The runtime per-record code paths (performance sensitive): yes — the interval-join operator's hot path is touched, but all early-fire work is gated on the hint being set, an outer join, and a non-negative window, so a plain interval join allocates nothing new and behaves as before. - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? yes - If yes, how is the feature documented? docs (the SQL joins reference, EN + ZH) and JavaDocs on the new option class. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
