HeartSaVioR opened a new pull request, #55474:
URL: https://github.com/apache/spark/pull/55474

   ### What changes were proposed in this pull request?
   
   This PR changes `TransformWithStateExec.processTimers` to stop passing 
`prevBatchTimestampMs` / `eventTimeWatermarkForLateEvents` as the exclusive 
lower bound to `processorHandle.getExpiredTimers(...)`. Timer scans now always 
run without a lower bound (full scan up to the current batch timestamp / 
eviction watermark).
   
   Concretely:
   - ProcessingTime branch: `getExpiredTimers(batchTimestamp, 
prevBatchTimestampMs)` -> `getExpiredTimers(batchTimestamp)`.
   - EventTime branch: the `STATEFUL_OPERATOR_ALLOW_MULTIPLE` conditional that 
computed `prevWatermark` is removed; `getExpiredTimers(watermark, 
prevWatermark)` -> `getExpiredTimers(watermark)`.
   
   The `TimerStateImpl.getExpiredTimers` signature is unchanged — its 
`prevExpiryTimestampMs` parameter (added in SPARK-56400) is retained as a 
library primitive so that we can re-enable the optimization in the future once 
`registerTimer` enforces `ts > currentBatchTimestamp / watermark` server-side.
   
   `TTLState.ttlEvictionIterator` is also unchanged. TTL expirations are always 
strictly above `prevBatchTimestampMs` by construction (TTL is 
processing-time-only, `ttlDuration` is validated `> 0`, and `batchTimestampMs` 
is monotonically non-decreasing), so the TTL path is not affected by this bug.
   
   ### Why are the changes needed?
   
   SPARK-56400 introduced a bounded `rangeScan` in 
`TimerStateImpl.getExpiredTimers` whose exclusive lower bound is 
`prevBatchTimestampMs` (ProcessingTime) or `eventTimeWatermarkForLateEvents` 
(EventTime, multi-op — the default). `registerTimer` has guard on the 
registered expiry, so a user can legally call `registerTimer(ts)` with `ts` at 
or below that lower bound. In that case the bounded scan excludes the entry, 
and because the lower bound is monotonically non-decreasing across batches, the 
timer is never fired in any subsequent batch either — silently lost.
   
   Reproduction:
   
   - ProcessingTime: from batch 2 onwards (`prevBatchTimestampMs` is 
non-`None`), any `registerTimer(ts <= prevBatchTimestampMs)` never fires.
   - EventTime (default `STATEFUL_OPERATOR_ALLOW_MULTIPLE=true`): any 
`registerTimer(ts <= eventTimeWatermarkForLateEvents)` never fires.
   - EventTime legacy single-op (`STATEFUL_OPERATOR_ALLOW_MULTIPLE=false`): 
unaffected — `processTimers` already falls back to a full scan in that mode.
   Affected idioms the bug silently breaks:
   - `registerTimer(0L)` as a "fire on the next batch" pattern.
   - Event-time timers derived from a record's `event_time` in multi-op chains 
where a downstream operator's late-events watermark is looser than the upstream.
   - Any `registerTimer(ts)` that hits `ts <= prev` even once — the timer is 
dead forever unless the user also calls `deleteTimer`.
   
   Reverting the lower bound is the most conservative fix. A complementary 
follow-up (tracked separately) should restrict the valid timestamp range on 
`registerTimer` so that `ts > currentBatchTimestamp / watermark` is enforced; 
after that lands we can re-enable the bounded scan.
   
   ### Does this PR introduce _any_ user-facing change?
   
   No, since the bug wasn't released yet.
   
   ### How was this patch tested?
   
   New UTs.
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   Generated-by: Claude Opus 4.7


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to