Hi Weiqing. +1 for the FLIP. I have some suggestions about the FLIP:
1. Compared to adding a field named originalRowType in `RowDataSerializer`, I prefer to add a field named fieldNames with type String[] . WDYT? I think this field is used for name-based field mapping, so we just add the required names to the serializer. 2. This FLIP shows that if the state corresponding to an operator's ID is retained in the state backend, the operator restores the state according to name-based field mapping. But I think field-named mapping is dangerous in some cases. For example, users have an aggregation Aggregate(groupBy(a, b), SUM(c)) in its job. After modification, its job has a modified Aggregate(groupBy(a,b), SUM(d), SUM(c)). In this case, SUM(d) agg buffer name is agg0_max and SUM(c) is agg1_max after modification but SUM(c)'s agg buffer name is agg0_max. How about introducing an option to let users determine whether to enable schema migration? 3. The example in the FLIP is not very good because the current proposal still can not migrate state if type changes? Because I see the FLIP writes "Incompatible changes (removing fields, changing field types) will still be rejected with clear error messages". 4. It is better to clarify when schema migration works and when it does not from the user's perspective. The current proposal mainly focuses on the operator perspective, which may not be user-friendly. Best, Shengkai Weiqing Yang <yangweiqing...@gmail.com> 于2025年7月18日周五 14:42写道: > Hi Zakelly and Hangxiang, > > Just checking in - do you have any concerns or feedback? > > If there are no further objections from anyone, I’ll mark the FLIP as ready > for voting. > > > Best, > Weiqing > > On Sun, Jul 6, 2025 at 11:46 PM Weiqing Yang <yangweiqing...@gmail.com> > wrote: > > > Hi Hangxiang, Zakelly, > > > > Thank you for the careful review and the +1 on the proposal. > > > > *1. Where to host the migration logic* > > > > I experimented with placing the migration hook on > > TypeSerializerSchemaCompatibility, but ran into two issues: > > > > - > > > > Is the "schemaEvolutionSerializer" intended to be the new > > TypeSerializer? The migration needs access to both the > > DataInputDeserializer (the value) and the new TypeSerializer. > > - > > > > TypeSerializerSchemaCompatibility is currently designed as a result > > holder, not an executor, so keeping the procedural logic inside > > TypeSerializerSnapshot seems clearer. > > > > *2. Naming the snapshot field* > > > > I can change the field to `oldSerializerSnapshot` for consistency with ` > > resolveSchemaCompatibility()`, if you think that’s clearer. Note that > > migrateState() will still require the new serializer, so the method > > signature will remain migrateState(oldSnapshot, newSerializer, dataInput, > > ...). > > > > *3. Need for migrateElement()* > > > > I initially tried relying only on migrateState(), but for > RocksDBListState > > the code became much less clean, as it stores list elements individually > > with delimiters. A dedicated migrateElement() method keeps that migration > > logic more readable (also slightly improves performance) for ListState. > > > > For context, I cherry-picked our internal PR (used in production on Flink > > v1.16) that illustrates these points in practice: > > > https://github.com/weiqingy/flink/commit/00539b16cc55bcd144ba65c052142fbe6a556842 > > > > I’m happy to iterate further - please let me know your thoughts. > > > > > > Thanks again! > > Weiqing > > > > On Tue, May 6, 2025 at 11:54 PM Hangxiang Yu <master...@gmail.com> > wrote: > > > >> Hi, Weiqing. > >> Thanks for driving this FLIP. > >> I'm +1 for supporting schema evolution for SQL RowData type. > >> > >> I just have some questions: > >> 1. Could we consider defining a method returning > >> *SchemaEvolutionSerializer* > >> in *TypeSerializerSchemaCompatibility* (like > >> compatibleAfterMigration(TypeSerializer<T> schemaEvolutionSerializer))? > >> Then we could also only care about implementing the schema evolution as > >> the > >> format of serializer (which could also be automic support since we > >> implement it inside and call it in the internal state). > >> I think it may be better because *TypeSerializerSnapshot* is a common > >> interface and many of its implementations may not really need > >> *migrateState* > >> . > >> > >> 2. Considering the semantics of *TypeSerializerSnapshot*, I'd also > suggest > >> changing the field to *oldSerializerSnapshot *which is also consistent > >> with > >> *resolveSchemaCompatibility* > >> > >> *3. *Do we really need an extra *migrateElement* method ? Or if we go > with > >> the option of defining *SchemaEvolutionSerializer, *Could element schema > >> evolution serializer just be a special *SchemaEvolutionSerializer ?* > >> > >> On Tue, Apr 29, 2025 at 2:30 PM Weiqing Yang <yangweiqing...@gmail.com> > >> wrote: > >> > >> > Thanks for the suggestions, Zakelly! > >> > > >> > Regarding *migrateElement* - it is specifically needed for ListState, > >> which > >> > stores elements individually with delimiters. Its implementation > >> > deserializes and processes each element one by one during migration, > so > >> I > >> > introduced the *migrateElement* API to handle this per-element > >> processing. > >> > > >> > Regarding the *migrateState *signature - I’m open to suggestions. My > >> > original design aimed to align with the existing implementations in > >> > RocksDBMapState, RocksDBListState, and AbstractRocksDBState. For > >> example, > >> > in RocksDBMapState (v1.20), the migrateSerializedValue > >> > < > >> > > >> > https://github.com/apache/flink/blob/release-1.20.1/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java#L243 > >> > > > >> > method first deserializes the old value and then serializes it with > the > >> new > >> > serializer: > >> > > >> > *... > >> > **if (!isNull) { > >> > ** mapUserValue = > >> > priorMapValueSerializer.deserialize(serializedOldValueInput); > >> > **} > >> > **... > >> > **newMapValueSerializer.serialize(mapUserValue, > >> > serializedMigratedValueOutput);* > >> > > >> > Here, *snapshotConfiguration.migrateState* is called as: > >> > > >> > *if (!isNull) {* > >> > *priorMapValueSerializer.snapshotConfiguration().migrateState(* > >> > *priorMapValueSerializer**, newSerializer, > >> > serializedOldValueInput, serializedMigratedValueOutput); > >> > * > >> > *}* > >> > > >> > The idea was to mirror this structure - delegate the migration logic > >> > to *priorSerializer.snapshotConfiguration(), > >> > *passing both the prior and new serializers. > >> > > >> > On Mon, Apr 28, 2025 at 4:24 AM Zakelly Lan <zakelly....@gmail.com> > >> wrote: > >> > > >> > > Hi Weiqiang, > >> > > > >> > > Thanks for your answers! > >> > > > >> > > It seems a simple deserialization-serialization lacks flexibility, > >> thus > >> > I'd > >> > > agree to introduce new methods. > >> > > I'd suggest changing the signature to: > >> > > ``` > >> > > public void migrateState( > >> > > TypeSerializerSnapshot<T> oldSerializerSnapshot, > >> > > DataInputDeserializer in, > >> > > DataOutputSerializer out) throws IOException > >> > > ``` > >> > > which is more aligned with other methods under > >> `TypeSerializerSnapshot`. > >> > > WDYT? > >> > > > >> > > And another question: Could you describe in which case we need > >> > > `migrateElement`? > >> > > > >> > > > >> > > Best, > >> > > Zakelly > >> > > > >> > > On Mon, Apr 28, 2025 at 2:49 AM Weiqing Yang < > >> yangweiqing...@gmail.com> > >> > > wrote: > >> > > > >> > > > Hi Zakelly, > >> > > > > >> > > > Thanks for your feedback. > >> > > > > >> > > > You're right - *resolveSchemaCompatibility* is critical for > >> identifying > >> > > > schema compatibility. However, our challenge extends beyond > >> detection > >> > to > >> > > > handling the actual migration process, particularly given > RowData’s > >> > > complex > >> > > > requirements. > >> > > > > >> > > > The standard migration logic in *AbstractRocksDBState* isn't > >> sufficient > >> > > for > >> > > > RowData because, during migration, we need to: > >> > > > > >> > > > - Add null values for newly added fields > >> > > > - Reorder fields based on field names in the new schema > >> > > > - Recursively handle nested structures > >> > > > - Apply different migration logic depending on the state type > >> (e.g., > >> > > > *ListState* uses *migrateElement*(), *MapState* uses > >> > *migrateState*()) > >> > > > > >> > > > > >> > > > > >> > > > The current approach: > >> > > > > >> > > > > >> > > > *V value = > >> > > > > >> > > > >> > > >> > priorSerializer.deserialize(serializedOldValueInput);**newSerializer.serialize(value, > >> > > > serializedMigratedValueOutput);* > >> > > > > >> > > > doesn’t offer enough control for these needs. > >> > > > > >> > > > The proposed *migrateState* and *migrateElement* methods maintain > >> > > backward > >> > > > compatibility with default implementations, while enabling RowData > >> to > >> > > > perform specialized migration logic without requiring backend > >> changes. > >> > > > > >> > > > I’ve updated the proposal document to include pseudo-code examples > >> of > >> > > > *migrateState* and *migrateElement* in the > >> *RowDataSerializerSnapshot* > >> > > > class to illustrate this. Let me know if I missed anything. > >> > > > > >> > > > Best regards, > >> > > > Weiqing > >> > > > > >> > > > On Sat, Apr 26, 2025 at 9:39 PM Zakelly Lan < > zakelly....@gmail.com> > >> > > wrote: > >> > > > > >> > > > > Hi, Weiqing > >> > > > > > >> > > > > Thanks for the FLIP! In general I'd +1 for schema evolution for > >> > RowData > >> > > > > types, which will enhance the user experience of SQL jobs. > >> > > > > > >> > > > > I have one questions for now: > >> > > > > > >> > > > > You suggested introducing new methods in > `TypeSerializerSnapshot`, > >> > but > >> > > is > >> > > > > it possible to leverage existing state migration procedure[1], > >> which > >> > > also > >> > > > > performs deserialization and serialization with old and new > >> > serializer > >> > > > > correspondingly. IIUC, all we need is to properly implement > >> > > > > `resolveSchemaCompatibility` for `RowDataSerializerSnapshot`[2] > >> since > >> > > it > >> > > > > will be invoked here[3]. No need for new methods, right? > >> > > > > > >> > > > > > >> > > > > [1] > >> > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > https://github.com/apache/flink/blob/f8b3c4b9a8ce1c6a094fcc0f292faea4bad8806c/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/AbstractRocksDBState.java#L201-L205 > >> > > > > [2] > >> > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > https://github.com/apache/flink/blob/f8b3c4b9a8ce1c6a094fcc0f292faea4bad8806c/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/RowDataSerializer.java#L335 > >> > > > > [3] > >> > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > https://github.com/apache/flink/blob/f8b3c4b9a8ce1c6a094fcc0f292faea4bad8806c/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSerializerProvider.java#L312 > >> > > > > > >> > > > > > >> > > > > Best, > >> > > > > Zakelly > >> > > > > > >> > > > > On Sat, Apr 26, 2025 at 1:47 PM Weiqing Yang < > >> > yangweiqing...@gmail.com > >> > > > > >> > > > > wrote: > >> > > > > > >> > > > > > Hi all, > >> > > > > > > >> > > > > > I’d like to initiate a discussion about enhancing state schema > >> > > > evolution > >> > > > > > support for RowData in Flink. > >> > > > > > > >> > > > > > *Motivation* > >> > > > > > > >> > > > > > Flink applications frequently need to evolve their state > schema > >> as > >> > > > > business > >> > > > > > requirements change. Currently, when users update a Table API > or > >> > SQL > >> > > > job > >> > > > > > with schema changes involving RowData types (particularly > nested > >> > > > > > structures), they encounter serialization compatibility errors > >> > during > >> > > > > state > >> > > > > > restoration, causing job failures.The issue occurs because > >> existing > >> > > > state > >> > > > > > migration mechanisms don't properly handle RowData types > during > >> > > schema > >> > > > > > evolution, preventing users from making backward-compatible > >> changes > >> > > > like: > >> > > > > > > >> > > > > > - > >> > > > > > > >> > > > > > Adding nullable fields to existing structures > >> > > > > > - > >> > > > > > > >> > > > > > Reordering fields within a row while preserving field names > >> > > > > > - > >> > > > > > > >> > > > > > Evolving nested row structures > >> > > > > > > >> > > > > > This limitation impacts production applications using Flink's > >> Table > >> > > > API, > >> > > > > as > >> > > > > > the RowData type is central to this interface. Users are > forced > >> to > >> > > > choose > >> > > > > > between maintaining outdated schemas or reprocessing all state > >> data > >> > > > when > >> > > > > > schema changes are required. > >> > > > > > > >> > > > > > Here’s the proposal document: Link > >> > > > > > < > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > https://docs.google.com/document/d/1WtAxp-jAVTLMOfWNldLCAoK137P0ZCMxR8hOZGcMxuc/edit?tab=t.0 > >> > > > > > > > >> > > > > > Your feedback and ideas are welcome to refine this feature. > >> > > > > > > >> > > > > > Thanks, > >> > > > > > Weiqing > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> > >> -- > >> Best, > >> Hangxiang. > >> > > >