I understand your point that during query evolution, the non-deterministic UIDs
of operators—other than PTF—prevent the job from being properly restored from a
savepoint.
IIUC, regarding sources specifically, you’re highlighting that changes in
topological ordering cause the source UID to change (i.e., the first part of
<id>_<transformation> is updated). In such cases, by enabling
nonrepresentational and explicitly specifying a new source offset (e.g., via
RESUME_OR_FROM_TIMESTAMP), the new offset overrides the one stored in the
state. However, this internal behavior is somewhat tricky: if the source UID
remains unchanged, the resulting behavior can become confusing or inconsistent.
Let’s put the implementation details aside. From a user’s perspective, I think
it’s unclear what the consistent, well-defined behavior should be for the
source offset stored in state under these two configurations: (Users shouldn't
need to distinguish whether the internal source UID has changed in order to
understand the resulting behavior.)
- STATE_RETENTION = ALL with START_MODE = BEGINNING
- STATE_RETENTION = NONE with START_MODE = RESUME_OR_...
Additional, after query modifications or multi-version optimizer evolution, UID
changes often make STATE_RETENTION = ALL impractical. Nevertheless, I believe
the interaction between STATE_RETENTION and START_MODE options should still be
clearly documented and exposed to users. What do you think?
--
Best!
Xuyang
At 2026-02-16 20:29:26, "Timo Walther" <[email protected]> wrote:
>Hi everyone,
>
>due to private reasons, Ramin is currently not able to followup on this
>thread. Let me try to steer the discussion until he is back.
>
>Let me also try to give more background and thoughts:
>
>Overall this FLIP is just a SQL API to what is possible already with
>DataStream API and Flink CLI commands.
>
>In DataStream API one can set the specific offsets in code but still
>submit the jar and restore from a savepoint. In those cases, the state
>(not the code definition) is used for the offsets. The Flink CLI offers
>commands for restoring state and skipping parts of it via
>allowNonRestoredState. But overall the validation is very thin, because
>Flink does not actively check whether the previous pipeline was actually
>stateful or whether the UID of the Kafka source has been changed and
>thus offsets of code take precedence over offsets in savepoint.
>
>We are not planning to change the engine significantly for this FLIP.
>Many combinations won't be supported initially, but the long-term goal
>is to support more combinations and of course more stateful evolutions
>eventually. In the mid-term, I only see the PTF_ONLY case as realistic,
>because PTFs are self-contained and identifiable via a UID. Optimizer
>rules that reorder the topology make other state retention policies
>difficult to implement.
>
>Nevertheless, both START_MODE and STATE_RETENTION are two dimensions
>that help us in capturing a user's intent for "where to start?" and
>"what to memorize?". With this intent we can improve the validation
>layer and inform users that changing a streaming query significantly
>without reprocessing won't work. Many users often do not understand the
>limitations of query evolution. The goal is also to make realities more
>explicit.
>
>And as the FLIP also mentions in a small side note:
>
>"capability relies on the Catalog and Connector implementation"
>
>So this FLIP wraps existing API for state retention, but at the same
>time also makes it possible for catalog/connector implementors to come
>up with their custom logic for how RESUME_OR_FROM_BEGINNING or
>FROM_BEGINNING is handled. For example, in case of a FROM_BEGINNING the
>catalog could instruct the source to assign a new uid, in which case the
>offsets take precendence over the savepoint.
>
>Looking forward to your thoughts.
>
>Cheers,
>Timo
>
>
>
>
>One can specify the offset
>
>
>On 09.02.26 04:01, Xuyang wrote:
>> OK, I now understand why PTF_ONLY was introduced. Additionally, I have a few
>> more questions I want to clarify:
>> 1. Since we've planed to exposed APIs that allow users to combine
>> STATE_RETENTION and START_MODE, we should clearly document how these
>> settings interact—specifically, their effective behaviors and potential
>> pitfalls. This includes aspects users must handle themselves, such as
>> inconsistencies between state and source data, or operators potentially
>> consuming redundant records. To that end, should we complete a matrix
>> covering all possible combinations of these parameters, annotate it with
>> clear user guidance, and push it in the official documentation?
>> 2. I also have a concern, similar to Ron’s question, regarding the fact that
>> the offset in the source is effectively part of the job’s state. The source
>> behavior becomes ambiguous due to conflicting offset semantics between these
>> two parameters:
>> a) STATE_RETENTION = ALL and START_MODE = BEGINNING
>> b) STATE_RETENTION = NONE and START_MODE = RESUME_OR_...
>> By the way, I saw your response to Ron’s question. I’m not very familiar
>> with the Kafka connector, but from the code snippet, it appears that the
>> offsets committed to Kafka (based on the latest checkpoint) are done
>> asynchronously [1] and without guaranteed success [2].
>> I’m still not quite clear on what you meant by "decouple offset management
>> from state management"—is there a specific plan or design goal behind this?
>> In my view, offsets naturally belong to the job’s state. The reason is that
>> external systems (like Kafka) aren’t well-suited for tracking offsets across
>> multiple checkpoints—they typically only retain the latest committed offset.
>> Relying solely on external storage to record the last consumed position can
>> be unreliable and limits our ability to restore from arbitrary checkpoints.
>> 3. Friendly reminder: please incorporate key details from discussion in this
>> mail list into the FLIP document such as behavioral differences between
>> streaming and batch modes.
>>
>>
>> [1]
>> https://github.com/apache/flink-connector-kafka/blob/cdbd635171fc4322ba7182eb93e920472d6d9d91/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java#L258
>> [2]
>> https://github.com/apache/flink-connector-kafka/blob/cdbd635171fc4322ba7182eb93e920472d6d9d91/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java#L153
>>
>>
>>
>> --
>>
>> Best!
>> Xuyang
>>
>>
>>
>> At 2026-02-06 21:08:29, "Ramin Gharib" <[email protected]> wrote:
>>> Hi Xuyang,
>>>
>>> Thanks for the detailed questions. You've highlighted some important edge
>>> cases.
>>>
>>> It is important to note that STATE_RETENTION (especially PTF_ONLY) is
>>> designed as a "power user" feature. It provides granular control, but with
>>> that comes the responsibility to understand the implications of mixing old
>>> state with new logic.
>>>
>>> Here are the clarifications for your specific points:
>>>
>>> 1. Potential Inconsistencies with PTF_ONLY This is a very valid point. When
>>> using surgical state retention (PTF_ONLY), it is indeed the user's
>>> responsibility to ensure that the retained state is semantically compatible
>>> with the new query logic. Flink guarantees technical consistency (the bytes
>>> will load), but semantic consistency (e.g., "Does this old window state
>>> make sense with my new filter?") is up to the user.
>>> If a user cannot guarantee this safety, they should default to
>>> STATE_RETENTION = NONE to ensure correctness. This aligns with how the
>>> "State Processor API" or allowNonRestoredState works in the DataStream
>>> API—it is a sharp tool for experts.
>>>
>>> 2. Scope of Granularity (JOIN_ONLY, AGG_ONLY vs PTF) That is an interesting
>>> idea, but we have technical reasons for focusing on PTFs first.
>>> First, standard SQL operators (Joins, Aggregates) are generated by the
>>> planner, and their internal state structure and UIDs are tightly coupled to
>>> the physical plan. Making these "evolvable" by name would require a massive
>>> overhaul of how Flink SQL assigns UIDs. Second, an option like JOIN_ONLY
>>> would presuppose that the Join operator itself supports state evolution
>>> (e.g., handling topology changes while keeping state). Currently, standard
>>> SQL operators do not provide this guarantee.
>>> PTFs, on the other hand, act as "black boxes" with explicit, user-defined
>>> state and stable naming, making them the ideal candidate for this initial
>>> version. We are currently unsure of the Return on Investment for making
>>> internal SQL operators addressable at this stage.
>>>
>>> 3. Interaction between STATE_RETENTION = ALL and START_MODE =
>>> FROM_BEGINNING We debated this combination extensively. The motivation for
>>> supporting this is to enable Bootstrap scenarios.
>>> For example, a user might want to reprocess historical data to fix a data
>>> quality issue (the "From Beginning" part) but effectively "merge" this
>>> fixed history into their existing, valuable running aggregates (the "All
>>> State" part). This acts like an idempotent Upsert pipeline: you keep the
>>> continuity of the live view while correcting the underlying dataset.
>>> Ultimately, we view the validity of these combinations as a concern for the
>>> Catalog Implementation. The Catalog should validate the intent and reject
>>> combinations it cannot support.
>>>
>>> 4. STATE_RETENTION Behavior in Batch Mode In Batch mode, STATE_RETENTION
>>> has no semantic meaning. While we understand the desire for a unified API,
>>> we believe that silently ignoring the flag creates dangerous ambiguity.
>>> To follow the "Fail Fast" principle, we prefer to throw an exception if
>>> this clause is used in Batch mode. This is analogous to how Flink handles
>>> state restoration mismatches: if a user requests a specific state behavior
>>> (e.g., restoring a savepoint) that cannot be fulfilled by the current job
>>> graph, Flink throws an exception rather than silently dropping the state
>>> (unless explicitly overridden by flags like allowNonRestoredState).
>>> Similarly, if a user requests STATE_RETENTION=ALL, and the Batch engine
>>> cannot fulfill it, we should fail explicitly rather than silently ignoring
>>> the user's intent.
>>>
>>> Best,
>>>
>>> Ramin
>>>
>>> On Mon, Dec 1, 2025 at 6:23 PM Ramin Gharib <[email protected]> wrote:
>>>
>>>> Hi everyone,
>>>>
>>>> I would like to start a discussion on FLIP-557: Granular Control over Data
>>>> Reprocessing and State Retention in Materialized Table Evolution [1].
>>>>
>>>> Currently, ALTER MATERIALIZED TABLE forces a full job restart and
>>>> discards state, which is inefficient for many evolution scenarios. FLIP-557
>>>> proposes decoupling data scope from state management by introducing two new
>>>> optional clauses:
>>>> 1. START_MODE*:* Controls the data processing window (e.g., FROM_BEGINNING,
>>>> RESUME_OR_...).
>>>>
>>>> 2. STATE_RETENTION*:* Controls how existing state is handled (e.g., NONE,
>>>> PTF_ONLY).
>>>>
>>>> This gives users explicit control over cost and correctness during table
>>>> evolution.
>>>>
>>>> For more details, please refer to the FLIP [1].
>>>>
>>>> Looking forward to your feedback and thoughts!
>>>>
>>>> [1]
>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-557%3A+Granular+Control+over+Data+Reprocessing+and+State+Retention+in+Materialized+Table+Evolution
>>>>
>>>> Best regards,
>>>>
>>>> Ramin Gharib
>>>>