Jungtaek Lim created SPARK-56566:
------------------------------------
Summary: ransformWithState silently drops user timers registered
at or below the previous batch's lower bound
Key: SPARK-56566
URL: https://issues.apache.org/jira/browse/SPARK-56566
Project: Spark
Issue Type: Bug
Components: Structured Streaming
Affects Versions: 4.2.0
Reporter: Jungtaek Lim
(This is a correctness issue but the change is yet to be released, so just
marked as major.)
https://issues.apache.org/jira/browse/SPARK-56400 introduced a bounded
rangeScan in TimerStateImpl.getExpiredTimers with an exclusive lower bound set
from prevBatchTimestampMs (ProcessingTime) or eventTimeWatermarkForLateEvents
(EventTime multi-op).
This is based on the general invariant with stateful operator that we do not
reuse the timestamp offset of state after we perform eviction, which is
unfortunately not true. registerTimer has no guard on the registered expiry, so
a user can legally call registerTimer(ts) with ts at or below that lower bound.
In that case, the bounded scan excludes the entry, and because
prevBatchTimestampMs / lateEventsWatermark are monotonically non-decreasing,
the timer is never fired in any subsequent batch either — silently lost.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]