HeartSaVioR opened a new pull request, #55474: URL: https://github.com/apache/spark/pull/55474
### What changes were proposed in this pull request? This PR changes `TransformWithStateExec.processTimers` to stop passing `prevBatchTimestampMs` / `eventTimeWatermarkForLateEvents` as the exclusive lower bound to `processorHandle.getExpiredTimers(...)`. Timer scans now always run without a lower bound (full scan up to the current batch timestamp / eviction watermark). Concretely: - ProcessingTime branch: `getExpiredTimers(batchTimestamp, prevBatchTimestampMs)` -> `getExpiredTimers(batchTimestamp)`. - EventTime branch: the `STATEFUL_OPERATOR_ALLOW_MULTIPLE` conditional that computed `prevWatermark` is removed; `getExpiredTimers(watermark, prevWatermark)` -> `getExpiredTimers(watermark)`. The `TimerStateImpl.getExpiredTimers` signature is unchanged — its `prevExpiryTimestampMs` parameter (added in SPARK-56400) is retained as a library primitive so that we can re-enable the optimization in the future once `registerTimer` enforces `ts > currentBatchTimestamp / watermark` server-side. `TTLState.ttlEvictionIterator` is also unchanged. TTL expirations are always strictly above `prevBatchTimestampMs` by construction (TTL is processing-time-only, `ttlDuration` is validated `> 0`, and `batchTimestampMs` is monotonically non-decreasing), so the TTL path is not affected by this bug. ### Why are the changes needed? SPARK-56400 introduced a bounded `rangeScan` in `TimerStateImpl.getExpiredTimers` whose exclusive lower bound is `prevBatchTimestampMs` (ProcessingTime) or `eventTimeWatermarkForLateEvents` (EventTime, multi-op — the default). `registerTimer` has guard on the registered expiry, so a user can legally call `registerTimer(ts)` with `ts` at or below that lower bound. In that case the bounded scan excludes the entry, and because the lower bound is monotonically non-decreasing across batches, the timer is never fired in any subsequent batch either — silently lost. Reproduction: - ProcessingTime: from batch 2 onwards (`prevBatchTimestampMs` is non-`None`), any `registerTimer(ts <= prevBatchTimestampMs)` never fires. - EventTime (default `STATEFUL_OPERATOR_ALLOW_MULTIPLE=true`): any `registerTimer(ts <= eventTimeWatermarkForLateEvents)` never fires. - EventTime legacy single-op (`STATEFUL_OPERATOR_ALLOW_MULTIPLE=false`): unaffected — `processTimers` already falls back to a full scan in that mode. Affected idioms the bug silently breaks: - `registerTimer(0L)` as a "fire on the next batch" pattern. - Event-time timers derived from a record's `event_time` in multi-op chains where a downstream operator's late-events watermark is looser than the upstream. - Any `registerTimer(ts)` that hits `ts <= prev` even once — the timer is dead forever unless the user also calls `deleteTimer`. Reverting the lower bound is the most conservative fix. A complementary follow-up (tracked separately) should restrict the valid timestamp range on `registerTimer` so that `ts > currentBatchTimestamp / watermark` is enforced; after that lands we can re-enable the bounded scan. ### Does this PR introduce _any_ user-facing change? No, since the bug wasn't released yet. ### How was this patch tested? New UTs. ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude Opus 4.7 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
