Hi Weiqing,
I like the idea. Would you give an example how a kafka or mysql connector
uses it to read data with different schemas for better understanding?

Best,
Hongshun

On Thu, Jul 24, 2025 at 10:34 AM Shengkai Fang <fskm...@gmail.com> wrote:

> Hi, Weiqing.
>
> Thanks for the update. It's better you can update the cwiki rather than
> google doc.
>
> After reading the doc, I just feel this feature is not applicable for users
> because few users understand the sql operator state structure and in some
> cases, the operator state structure is releated to the input operator
> schema(e.g. Join Operator). My suggestion is the FLIP should focus on the
> problem that input schema changes and sql is unchanged.
>
> If we assume the sql is unchanged and input schema changes, in most cases,
> planner will prune the unused columns. Could you give me an detailed
> example that name-based state migration works?
>
> Best,
> Shengkai
>
> Weiqing Yang <yangweiqing...@gmail.com> 于2025年7月23日周三 12:03写道:
>
> > Hi Shengkai,
> >
> > Thank you for your detailed feedback!
> >
> > I've updated the proposal to incorporate your suggestions:
> >
> >    1. fieldNames vs. originalRowType: I agree that using fieldNames:
> >    String[] is a more focused and lightweight approach than storing the
> >    full originalRowType. The proposal has been updated accordingly.
> >    2. Potential risks with name-based mapping: You raised a good point
> >    regarding risks in scenarios such as SQL query changes on aggregation.
> > To
> >    mitigate this, I've introduced a configuration option
> >    state.schema-evolution.enable (default: false), allowing users to
> >    explicitly opt in when they’ve verified that schema evolution is safe
> > for
> >    their use case.
> >    3. Examples clarification: To better illustrate the boundary between
> >    supported and unsupported changes, I’ve expanded the examples section.
> > It
> >    now clearly distinguishes supported cases (e.g., adding nullable
> fields,
> >    reordering) from unsupported ones (e.g., removing fields, changing
> field
> >    types).
> >    4. User perspective: I’ve also updated the proposal to clarify when
> >    schema evolution is supported and when it isn’t, along with the
> expected
> >    user workflow. This should help make the feature easier to understand
> > and
> >    adopt.
> >
> > Please let me know if you have any further suggestions or questions.
> >
> > Thanks again for your valuable input.
> >
> > Best,
> > Weiqing
> >
> > On Sun, Jul 20, 2025 at 10:14 PM Shengkai Fang <fskm...@gmail.com>
> wrote:
> >
> > > Hi Weiqing.
> > >
> > > +1 for the FLIP. I have some suggestions about the FLIP:
> > >
> > > 1. Compared to adding a field named originalRowType in
> > `RowDataSerializer`,
> > > I prefer to add a field named fieldNames with type String[] . WDYT? I
> > think
> > > this field is used for name-based field mapping, so we just add the
> > > required names to the serializer.
> > >
> > > 2. This FLIP shows that if the state corresponding to an operator's ID
> is
> > > retained in the state backend, the operator restores the state
> according
> > to
> > > name-based field mapping. But I think field-named mapping is dangerous
> in
> > > some cases. For example, users have an aggregation Aggregate(groupBy(a,
> > b),
> > > SUM(c)) in its job.  After modification, its job has a modified
> > > Aggregate(groupBy(a,b), SUM(d), SUM(c)). In this case, SUM(d) agg
> buffer
> > > name is agg0_max and SUM(c) is agg1_max after modification but SUM(c)'s
> > agg
> > > buffer name is agg0_max. How about introducing an option to let users
> > > determine whether to enable schema migration?
> > >
> > > 3. The example in the FLIP is not very good because the current
> proposal
> > > still can not migrate state if type changes? Because I see the FLIP
> > writes
> > > "Incompatible changes (removing fields, changing field types) will
> still
> > be
> > > rejected with clear error messages".
> > >
> > > 4. It is better to clarify when schema migration works and when it does
> > not
> > > from the user's perspective. The current proposal mainly focuses on the
> > > operator perspective, which may not be user-friendly.
> > >
> > > Best,
> > > Shengkai
> > >
> > > Weiqing Yang <yangweiqing...@gmail.com> 于2025年7月18日周五 14:42写道:
> > >
> > > > Hi Zakelly and Hangxiang,
> > > >
> > > > Just checking in - do you have any concerns or feedback?
> > > >
> > > > If there are no further objections from anyone, I’ll mark the FLIP as
> > > ready
> > > > for voting.
> > > >
> > > >
> > > > Best,
> > > > Weiqing
> > > >
> > > > On Sun, Jul 6, 2025 at 11:46 PM Weiqing Yang <
> yangweiqing...@gmail.com
> > >
> > > > wrote:
> > > >
> > > > > Hi Hangxiang, Zakelly,
> > > > >
> > > > > Thank you for the careful review and the +1 on the proposal.
> > > > >
> > > > > *1. Where to host the migration logic*
> > > > >
> > > > > I experimented with placing the migration hook on
> > > > > TypeSerializerSchemaCompatibility, but ran into two issues:
> > > > >
> > > > >    -
> > > > >
> > > > >    Is the "schemaEvolutionSerializer" intended to be the new
> > > > >    TypeSerializer? The migration needs access to both the
> > > > >    DataInputDeserializer (the value) and the new TypeSerializer.
> > > > >    -
> > > > >
> > > > >    TypeSerializerSchemaCompatibility is currently designed as a
> > result
> > > > >    holder, not an executor, so keeping the procedural logic inside
> > > > >    TypeSerializerSnapshot seems clearer.
> > > > >
> > > > > *2. Naming the snapshot field*
> > > > >
> > > > > I can change the field to `oldSerializerSnapshot` for consistency
> > with
> > > `
> > > > > resolveSchemaCompatibility()`, if you think that’s clearer. Note
> that
> > > > > migrateState() will still require the new serializer, so the method
> > > > > signature will remain migrateState(oldSnapshot, newSerializer,
> > > dataInput,
> > > > > ...).
> > > > >
> > > > > *3. Need for migrateElement()*
> > > > >
> > > > > I initially tried relying only on migrateState(), but for
> > > > RocksDBListState
> > > > > the code became much less clean, as it stores list elements
> > > individually
> > > > > with delimiters. A dedicated migrateElement() method keeps that
> > > migration
> > > > > logic more readable (also slightly improves performance) for
> > ListState.
> > > > >
> > > > > For context, I cherry-picked our internal PR (used in production on
> > > Flink
> > > > > v1.16) that illustrates these points in practice:
> > > > >
> > > >
> > >
> >
> https://github.com/weiqingy/flink/commit/00539b16cc55bcd144ba65c052142fbe6a556842
> > > > >
> > > > > I’m happy to iterate further - please let me know your thoughts.
> > > > >
> > > > >
> > > > > Thanks again!
> > > > > Weiqing
> > > > >
> > > > > On Tue, May 6, 2025 at 11:54 PM Hangxiang Yu <master...@gmail.com>
> > > > wrote:
> > > > >
> > > > >> Hi, Weiqing.
> > > > >> Thanks for driving this FLIP.
> > > > >> I'm +1 for supporting schema evolution for SQL RowData type.
> > > > >>
> > > > >> I just have some questions:
> > > > >> 1. Could we consider defining a method returning
> > > > >> *SchemaEvolutionSerializer*
> > > > >> in *TypeSerializerSchemaCompatibility* (like
> > > > >> compatibleAfterMigration(TypeSerializer<T>
> > > schemaEvolutionSerializer))?
> > > > >> Then we could also only care about implementing the schema
> evolution
> > > as
> > > > >> the
> > > > >> format of serializer (which could also be automic support since we
> > > > >> implement it inside and call it in the internal state).
> > > > >> I think it may be better because *TypeSerializerSnapshot* is a
> > common
> > > > >> interface and many of its implementations may not really need
> > > > >> *migrateState*
> > > > >> .
> > > > >>
> > > > >> 2. Considering the semantics of *TypeSerializerSnapshot*, I'd also
> > > > suggest
> > > > >> changing the field to *oldSerializerSnapshot *which is also
> > consistent
> > > > >> with
> > > > >> *resolveSchemaCompatibility*
> > > > >>
> > > > >> *3. *Do we really need an extra *migrateElement* method ? Or if we
> > go
> > > > with
> > > > >> the option of defining *SchemaEvolutionSerializer, *Could element
> > > schema
> > > > >> evolution serializer just be a special *SchemaEvolutionSerializer
> ?*
> > > > >>
> > > > >> On Tue, Apr 29, 2025 at 2:30 PM Weiqing Yang <
> > > yangweiqing...@gmail.com>
> > > > >> wrote:
> > > > >>
> > > > >> > Thanks for the suggestions, Zakelly!
> > > > >> >
> > > > >> > Regarding *migrateElement* - it is specifically needed for
> > > ListState,
> > > > >> which
> > > > >> > stores elements individually with delimiters. Its implementation
> > > > >> > deserializes and processes each element one by one during
> > migration,
> > > > so
> > > > >> I
> > > > >> > introduced the *migrateElement* API to handle this per-element
> > > > >> processing.
> > > > >> >
> > > > >> > Regarding the *migrateState *signature - I’m open to
> suggestions.
> > My
> > > > >> > original design aimed to align with the existing implementations
> > in
> > > > >> > RocksDBMapState, RocksDBListState, and AbstractRocksDBState. For
> > > > >> example,
> > > > >> > in RocksDBMapState (v1.20), the migrateSerializedValue
> > > > >> > <
> > > > >> >
> > > > >>
> > > >
> > >
> >
> https://github.com/apache/flink/blob/release-1.20.1/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java#L243
> > > > >> > >
> > > > >> > method first deserializes the old value and then serializes it
> > with
> > > > the
> > > > >> new
> > > > >> > serializer:
> > > > >> >
> > > > >> > *...
> > > > >> > **if (!isNull) {
> > > > >> > **    mapUserValue =
> > > > >> > priorMapValueSerializer.deserialize(serializedOldValueInput);
> > > > >> > **}
> > > > >> > **...
> > > > >> > **newMapValueSerializer.serialize(mapUserValue,
> > > > >> > serializedMigratedValueOutput);*
> > > > >> >
> > > > >> > Here, *snapshotConfiguration.migrateState* is called as:
> > > > >> >
> > > > >> > *if (!isNull) {*
> > > > >> >
> >  *priorMapValueSerializer.snapshotConfiguration().migrateState(*
> > > > >> >            *priorMapValueSerializer**, newSerializer,
> > > > >> > serializedOldValueInput, serializedMigratedValueOutput);
> > > > >> > *
> > > > >> > *}*
> > > > >> >
> > > > >> > The idea was to mirror this structure - delegate the migration
> > logic
> > > > >> > to *priorSerializer.snapshotConfiguration(),
> > > > >> > *passing both the prior and new serializers.
> > > > >> >
> > > > >> > On Mon, Apr 28, 2025 at 4:24 AM Zakelly Lan <
> > zakelly....@gmail.com>
> > > > >> wrote:
> > > > >> >
> > > > >> > > Hi Weiqiang,
> > > > >> > >
> > > > >> > > Thanks for your answers!
> > > > >> > >
> > > > >> > > It seems a simple deserialization-serialization lacks
> > flexibility,
> > > > >> thus
> > > > >> > I'd
> > > > >> > > agree to introduce new methods.
> > > > >> > > I'd suggest changing the signature to:
> > > > >> > > ```
> > > > >> > >  public void migrateState(
> > > > >> > >                TypeSerializerSnapshot<T>
> oldSerializerSnapshot,
> > > > >> > >                DataInputDeserializer in,
> > > > >> > >                DataOutputSerializer out) throws IOException
> > > > >> > > ```
> > > > >> > > which is more aligned with other methods under
> > > > >> `TypeSerializerSnapshot`.
> > > > >> > > WDYT?
> > > > >> > >
> > > > >> > > And another question: Could you describe in which case we need
> > > > >> > > `migrateElement`?
> > > > >> > >
> > > > >> > >
> > > > >> > > Best,
> > > > >> > > Zakelly
> > > > >> > >
> > > > >> > > On Mon, Apr 28, 2025 at 2:49 AM Weiqing Yang <
> > > > >> yangweiqing...@gmail.com>
> > > > >> > > wrote:
> > > > >> > >
> > > > >> > > > Hi Zakelly,
> > > > >> > > >
> > > > >> > > > Thanks for your feedback.
> > > > >> > > >
> > > > >> > > > You're right - *resolveSchemaCompatibility* is critical for
> > > > >> identifying
> > > > >> > > > schema compatibility. However, our challenge extends beyond
> > > > >> detection
> > > > >> > to
> > > > >> > > > handling the actual migration process, particularly given
> > > > RowData’s
> > > > >> > > complex
> > > > >> > > > requirements.
> > > > >> > > >
> > > > >> > > > The standard migration logic in *AbstractRocksDBState* isn't
> > > > >> sufficient
> > > > >> > > for
> > > > >> > > > RowData because, during migration, we need to:
> > > > >> > > >
> > > > >> > > >    - Add null values for newly added fields
> > > > >> > > >    - Reorder fields based on field names in the new schema
> > > > >> > > >    - Recursively handle nested structures
> > > > >> > > >    - Apply different migration logic depending on the state
> > type
> > > > >> (e.g.,
> > > > >> > > >    *ListState* uses *migrateElement*(), *MapState* uses
> > > > >> > *migrateState*())
> > > > >> > > >
> > > > >> > > >
> > > > >> > > >
> > > > >> > > > The current approach:
> > > > >> > > >
> > > > >> > > >
> > > > >> > > > *V value =
> > > > >> > > >
> > > > >> > >
> > > > >> >
> > > > >>
> > > >
> > >
> >
> priorSerializer.deserialize(serializedOldValueInput);**newSerializer.serialize(value,
> > > > >> > > > serializedMigratedValueOutput);*
> > > > >> > > >
> > > > >> > > > doesn’t offer enough control for these needs.
> > > > >> > > >
> > > > >> > > > The proposed *migrateState* and *migrateElement* methods
> > > maintain
> > > > >> > > backward
> > > > >> > > > compatibility with default implementations, while enabling
> > > RowData
> > > > >> to
> > > > >> > > > perform specialized migration logic without requiring
> backend
> > > > >> changes.
> > > > >> > > >
> > > > >> > > > I’ve updated the proposal document to include pseudo-code
> > > examples
> > > > >> of
> > > > >> > > > *migrateState* and *migrateElement* in the
> > > > >> *RowDataSerializerSnapshot*
> > > > >> > > > class to illustrate this. Let me know if I missed anything.
> > > > >> > > >
> > > > >> > > > Best regards,
> > > > >> > > > Weiqing
> > > > >> > > >
> > > > >> > > > On Sat, Apr 26, 2025 at 9:39 PM Zakelly Lan <
> > > > zakelly....@gmail.com>
> > > > >> > > wrote:
> > > > >> > > >
> > > > >> > > > > Hi, Weiqing
> > > > >> > > > >
> > > > >> > > > > Thanks for the FLIP! In general I'd +1 for schema
> evolution
> > > for
> > > > >> > RowData
> > > > >> > > > > types, which will enhance the user experience of SQL jobs.
> > > > >> > > > >
> > > > >> > > > > I have one questions for now:
> > > > >> > > > >
> > > > >> > > > > You suggested introducing new methods in
> > > > `TypeSerializerSnapshot`,
> > > > >> > but
> > > > >> > > is
> > > > >> > > > > it possible to leverage existing state migration
> > procedure[1],
> > > > >> which
> > > > >> > > also
> > > > >> > > > > performs deserialization and serialization with old and
> new
> > > > >> > serializer
> > > > >> > > > > correspondingly. IIUC, all we need is to properly
> implement
> > > > >> > > > > `resolveSchemaCompatibility` for
> > > `RowDataSerializerSnapshot`[2]
> > > > >> since
> > > > >> > > it
> > > > >> > > > > will be invoked here[3]. No need for new methods, right?
> > > > >> > > > >
> > > > >> > > > >
> > > > >> > > > > [1]
> > > > >> > > > >
> > > > >> > > > >
> > > > >> > > >
> > > > >> > >
> > > > >> >
> > > > >>
> > > >
> > >
> >
> https://github.com/apache/flink/blob/f8b3c4b9a8ce1c6a094fcc0f292faea4bad8806c/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/AbstractRocksDBState.java#L201-L205
> > > > >> > > > > [2]
> > > > >> > > > >
> > > > >> > > > >
> > > > >> > > >
> > > > >> > >
> > > > >> >
> > > > >>
> > > >
> > >
> >
> https://github.com/apache/flink/blob/f8b3c4b9a8ce1c6a094fcc0f292faea4bad8806c/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/RowDataSerializer.java#L335
> > > > >> > > > > [3]
> > > > >> > > > >
> > > > >> > > > >
> > > > >> > > >
> > > > >> > >
> > > > >> >
> > > > >>
> > > >
> > >
> >
> https://github.com/apache/flink/blob/f8b3c4b9a8ce1c6a094fcc0f292faea4bad8806c/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSerializerProvider.java#L312
> > > > >> > > > >
> > > > >> > > > >
> > > > >> > > > > Best,
> > > > >> > > > > Zakelly
> > > > >> > > > >
> > > > >> > > > > On Sat, Apr 26, 2025 at 1:47 PM Weiqing Yang <
> > > > >> > yangweiqing...@gmail.com
> > > > >> > > >
> > > > >> > > > > wrote:
> > > > >> > > > >
> > > > >> > > > > > Hi all,
> > > > >> > > > > >
> > > > >> > > > > > I’d like to initiate a discussion about enhancing state
> > > schema
> > > > >> > > > evolution
> > > > >> > > > > > support for RowData in Flink.
> > > > >> > > > > >
> > > > >> > > > > > *Motivation*
> > > > >> > > > > >
> > > > >> > > > > > Flink applications frequently need to evolve their state
> > > > schema
> > > > >> as
> > > > >> > > > > business
> > > > >> > > > > > requirements change. Currently, when users update a
> Table
> > > API
> > > > or
> > > > >> > SQL
> > > > >> > > > job
> > > > >> > > > > > with schema changes involving RowData types
> (particularly
> > > > nested
> > > > >> > > > > > structures), they encounter serialization compatibility
> > > errors
> > > > >> > during
> > > > >> > > > > state
> > > > >> > > > > > restoration, causing job failures.The issue occurs
> because
> > > > >> existing
> > > > >> > > > state
> > > > >> > > > > > migration mechanisms don't properly handle RowData types
> > > > during
> > > > >> > > schema
> > > > >> > > > > > evolution, preventing users from making
> > backward-compatible
> > > > >> changes
> > > > >> > > > like:
> > > > >> > > > > >
> > > > >> > > > > >    -
> > > > >> > > > > >
> > > > >> > > > > >    Adding nullable fields to existing structures
> > > > >> > > > > >    -
> > > > >> > > > > >
> > > > >> > > > > >    Reordering fields within a row while preserving field
> > > names
> > > > >> > > > > >    -
> > > > >> > > > > >
> > > > >> > > > > >    Evolving nested row structures
> > > > >> > > > > >
> > > > >> > > > > > This limitation impacts production applications using
> > > Flink's
> > > > >> Table
> > > > >> > > > API,
> > > > >> > > > > as
> > > > >> > > > > > the RowData type is central to this interface. Users are
> > > > forced
> > > > >> to
> > > > >> > > > choose
> > > > >> > > > > > between maintaining outdated schemas or reprocessing all
> > > state
> > > > >> data
> > > > >> > > > when
> > > > >> > > > > > schema changes are required.
> > > > >> > > > > >
> > > > >> > > > > > Here’s the proposal document: Link
> > > > >> > > > > > <
> > > > >> > > > > >
> > > > >> > > > >
> > > > >> > > >
> > > > >> > >
> > > > >> >
> > > > >>
> > > >
> > >
> >
> https://docs.google.com/document/d/1WtAxp-jAVTLMOfWNldLCAoK137P0ZCMxR8hOZGcMxuc/edit?tab=t.0
> > > > >> > > > > > >
> > > > >> > > > > > Your feedback and ideas are welcome to refine this
> > feature.
> > > > >> > > > > >
> > > > >> > > > > > Thanks,
> > > > >> > > > > > Weiqing
> > > > >> > > > > >
> > > > >> > > > >
> > > > >> > > >
> > > > >> > >
> > > > >> >
> > > > >>
> > > > >>
> > > > >> --
> > > > >> Best,
> > > > >> Hangxiang.
> > > > >>
> > > > >
> > > >
> > >
> >
>

Reply via email to