Hi Weiqing, I like the idea. Would you give an example how a kafka or mysql connector uses it to read data with different schemas for better understanding?
Best, Hongshun On Thu, Jul 24, 2025 at 10:34 AM Shengkai Fang <fskm...@gmail.com> wrote: > Hi, Weiqing. > > Thanks for the update. It's better you can update the cwiki rather than > google doc. > > After reading the doc, I just feel this feature is not applicable for users > because few users understand the sql operator state structure and in some > cases, the operator state structure is releated to the input operator > schema(e.g. Join Operator). My suggestion is the FLIP should focus on the > problem that input schema changes and sql is unchanged. > > If we assume the sql is unchanged and input schema changes, in most cases, > planner will prune the unused columns. Could you give me an detailed > example that name-based state migration works? > > Best, > Shengkai > > Weiqing Yang <yangweiqing...@gmail.com> 于2025年7月23日周三 12:03写道: > > > Hi Shengkai, > > > > Thank you for your detailed feedback! > > > > I've updated the proposal to incorporate your suggestions: > > > > 1. fieldNames vs. originalRowType: I agree that using fieldNames: > > String[] is a more focused and lightweight approach than storing the > > full originalRowType. The proposal has been updated accordingly. > > 2. Potential risks with name-based mapping: You raised a good point > > regarding risks in scenarios such as SQL query changes on aggregation. > > To > > mitigate this, I've introduced a configuration option > > state.schema-evolution.enable (default: false), allowing users to > > explicitly opt in when they’ve verified that schema evolution is safe > > for > > their use case. > > 3. Examples clarification: To better illustrate the boundary between > > supported and unsupported changes, I’ve expanded the examples section. > > It > > now clearly distinguishes supported cases (e.g., adding nullable > fields, > > reordering) from unsupported ones (e.g., removing fields, changing > field > > types). > > 4. User perspective: I’ve also updated the proposal to clarify when > > schema evolution is supported and when it isn’t, along with the > expected > > user workflow. This should help make the feature easier to understand > > and > > adopt. > > > > Please let me know if you have any further suggestions or questions. > > > > Thanks again for your valuable input. > > > > Best, > > Weiqing > > > > On Sun, Jul 20, 2025 at 10:14 PM Shengkai Fang <fskm...@gmail.com> > wrote: > > > > > Hi Weiqing. > > > > > > +1 for the FLIP. I have some suggestions about the FLIP: > > > > > > 1. Compared to adding a field named originalRowType in > > `RowDataSerializer`, > > > I prefer to add a field named fieldNames with type String[] . WDYT? I > > think > > > this field is used for name-based field mapping, so we just add the > > > required names to the serializer. > > > > > > 2. This FLIP shows that if the state corresponding to an operator's ID > is > > > retained in the state backend, the operator restores the state > according > > to > > > name-based field mapping. But I think field-named mapping is dangerous > in > > > some cases. For example, users have an aggregation Aggregate(groupBy(a, > > b), > > > SUM(c)) in its job. After modification, its job has a modified > > > Aggregate(groupBy(a,b), SUM(d), SUM(c)). In this case, SUM(d) agg > buffer > > > name is agg0_max and SUM(c) is agg1_max after modification but SUM(c)'s > > agg > > > buffer name is agg0_max. How about introducing an option to let users > > > determine whether to enable schema migration? > > > > > > 3. The example in the FLIP is not very good because the current > proposal > > > still can not migrate state if type changes? Because I see the FLIP > > writes > > > "Incompatible changes (removing fields, changing field types) will > still > > be > > > rejected with clear error messages". > > > > > > 4. It is better to clarify when schema migration works and when it does > > not > > > from the user's perspective. The current proposal mainly focuses on the > > > operator perspective, which may not be user-friendly. > > > > > > Best, > > > Shengkai > > > > > > Weiqing Yang <yangweiqing...@gmail.com> 于2025年7月18日周五 14:42写道: > > > > > > > Hi Zakelly and Hangxiang, > > > > > > > > Just checking in - do you have any concerns or feedback? > > > > > > > > If there are no further objections from anyone, I’ll mark the FLIP as > > > ready > > > > for voting. > > > > > > > > > > > > Best, > > > > Weiqing > > > > > > > > On Sun, Jul 6, 2025 at 11:46 PM Weiqing Yang < > yangweiqing...@gmail.com > > > > > > > wrote: > > > > > > > > > Hi Hangxiang, Zakelly, > > > > > > > > > > Thank you for the careful review and the +1 on the proposal. > > > > > > > > > > *1. Where to host the migration logic* > > > > > > > > > > I experimented with placing the migration hook on > > > > > TypeSerializerSchemaCompatibility, but ran into two issues: > > > > > > > > > > - > > > > > > > > > > Is the "schemaEvolutionSerializer" intended to be the new > > > > > TypeSerializer? The migration needs access to both the > > > > > DataInputDeserializer (the value) and the new TypeSerializer. > > > > > - > > > > > > > > > > TypeSerializerSchemaCompatibility is currently designed as a > > result > > > > > holder, not an executor, so keeping the procedural logic inside > > > > > TypeSerializerSnapshot seems clearer. > > > > > > > > > > *2. Naming the snapshot field* > > > > > > > > > > I can change the field to `oldSerializerSnapshot` for consistency > > with > > > ` > > > > > resolveSchemaCompatibility()`, if you think that’s clearer. Note > that > > > > > migrateState() will still require the new serializer, so the method > > > > > signature will remain migrateState(oldSnapshot, newSerializer, > > > dataInput, > > > > > ...). > > > > > > > > > > *3. Need for migrateElement()* > > > > > > > > > > I initially tried relying only on migrateState(), but for > > > > RocksDBListState > > > > > the code became much less clean, as it stores list elements > > > individually > > > > > with delimiters. A dedicated migrateElement() method keeps that > > > migration > > > > > logic more readable (also slightly improves performance) for > > ListState. > > > > > > > > > > For context, I cherry-picked our internal PR (used in production on > > > Flink > > > > > v1.16) that illustrates these points in practice: > > > > > > > > > > > > > > > https://github.com/weiqingy/flink/commit/00539b16cc55bcd144ba65c052142fbe6a556842 > > > > > > > > > > I’m happy to iterate further - please let me know your thoughts. > > > > > > > > > > > > > > > Thanks again! > > > > > Weiqing > > > > > > > > > > On Tue, May 6, 2025 at 11:54 PM Hangxiang Yu <master...@gmail.com> > > > > wrote: > > > > > > > > > >> Hi, Weiqing. > > > > >> Thanks for driving this FLIP. > > > > >> I'm +1 for supporting schema evolution for SQL RowData type. > > > > >> > > > > >> I just have some questions: > > > > >> 1. Could we consider defining a method returning > > > > >> *SchemaEvolutionSerializer* > > > > >> in *TypeSerializerSchemaCompatibility* (like > > > > >> compatibleAfterMigration(TypeSerializer<T> > > > schemaEvolutionSerializer))? > > > > >> Then we could also only care about implementing the schema > evolution > > > as > > > > >> the > > > > >> format of serializer (which could also be automic support since we > > > > >> implement it inside and call it in the internal state). > > > > >> I think it may be better because *TypeSerializerSnapshot* is a > > common > > > > >> interface and many of its implementations may not really need > > > > >> *migrateState* > > > > >> . > > > > >> > > > > >> 2. Considering the semantics of *TypeSerializerSnapshot*, I'd also > > > > suggest > > > > >> changing the field to *oldSerializerSnapshot *which is also > > consistent > > > > >> with > > > > >> *resolveSchemaCompatibility* > > > > >> > > > > >> *3. *Do we really need an extra *migrateElement* method ? Or if we > > go > > > > with > > > > >> the option of defining *SchemaEvolutionSerializer, *Could element > > > schema > > > > >> evolution serializer just be a special *SchemaEvolutionSerializer > ?* > > > > >> > > > > >> On Tue, Apr 29, 2025 at 2:30 PM Weiqing Yang < > > > yangweiqing...@gmail.com> > > > > >> wrote: > > > > >> > > > > >> > Thanks for the suggestions, Zakelly! > > > > >> > > > > > >> > Regarding *migrateElement* - it is specifically needed for > > > ListState, > > > > >> which > > > > >> > stores elements individually with delimiters. Its implementation > > > > >> > deserializes and processes each element one by one during > > migration, > > > > so > > > > >> I > > > > >> > introduced the *migrateElement* API to handle this per-element > > > > >> processing. > > > > >> > > > > > >> > Regarding the *migrateState *signature - I’m open to > suggestions. > > My > > > > >> > original design aimed to align with the existing implementations > > in > > > > >> > RocksDBMapState, RocksDBListState, and AbstractRocksDBState. For > > > > >> example, > > > > >> > in RocksDBMapState (v1.20), the migrateSerializedValue > > > > >> > < > > > > >> > > > > > >> > > > > > > > > > > https://github.com/apache/flink/blob/release-1.20.1/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java#L243 > > > > >> > > > > > > >> > method first deserializes the old value and then serializes it > > with > > > > the > > > > >> new > > > > >> > serializer: > > > > >> > > > > > >> > *... > > > > >> > **if (!isNull) { > > > > >> > ** mapUserValue = > > > > >> > priorMapValueSerializer.deserialize(serializedOldValueInput); > > > > >> > **} > > > > >> > **... > > > > >> > **newMapValueSerializer.serialize(mapUserValue, > > > > >> > serializedMigratedValueOutput);* > > > > >> > > > > > >> > Here, *snapshotConfiguration.migrateState* is called as: > > > > >> > > > > > >> > *if (!isNull) {* > > > > >> > > > *priorMapValueSerializer.snapshotConfiguration().migrateState(* > > > > >> > *priorMapValueSerializer**, newSerializer, > > > > >> > serializedOldValueInput, serializedMigratedValueOutput); > > > > >> > * > > > > >> > *}* > > > > >> > > > > > >> > The idea was to mirror this structure - delegate the migration > > logic > > > > >> > to *priorSerializer.snapshotConfiguration(), > > > > >> > *passing both the prior and new serializers. > > > > >> > > > > > >> > On Mon, Apr 28, 2025 at 4:24 AM Zakelly Lan < > > zakelly....@gmail.com> > > > > >> wrote: > > > > >> > > > > > >> > > Hi Weiqiang, > > > > >> > > > > > > >> > > Thanks for your answers! > > > > >> > > > > > > >> > > It seems a simple deserialization-serialization lacks > > flexibility, > > > > >> thus > > > > >> > I'd > > > > >> > > agree to introduce new methods. > > > > >> > > I'd suggest changing the signature to: > > > > >> > > ``` > > > > >> > > public void migrateState( > > > > >> > > TypeSerializerSnapshot<T> > oldSerializerSnapshot, > > > > >> > > DataInputDeserializer in, > > > > >> > > DataOutputSerializer out) throws IOException > > > > >> > > ``` > > > > >> > > which is more aligned with other methods under > > > > >> `TypeSerializerSnapshot`. > > > > >> > > WDYT? > > > > >> > > > > > > >> > > And another question: Could you describe in which case we need > > > > >> > > `migrateElement`? > > > > >> > > > > > > >> > > > > > > >> > > Best, > > > > >> > > Zakelly > > > > >> > > > > > > >> > > On Mon, Apr 28, 2025 at 2:49 AM Weiqing Yang < > > > > >> yangweiqing...@gmail.com> > > > > >> > > wrote: > > > > >> > > > > > > >> > > > Hi Zakelly, > > > > >> > > > > > > > >> > > > Thanks for your feedback. > > > > >> > > > > > > > >> > > > You're right - *resolveSchemaCompatibility* is critical for > > > > >> identifying > > > > >> > > > schema compatibility. However, our challenge extends beyond > > > > >> detection > > > > >> > to > > > > >> > > > handling the actual migration process, particularly given > > > > RowData’s > > > > >> > > complex > > > > >> > > > requirements. > > > > >> > > > > > > > >> > > > The standard migration logic in *AbstractRocksDBState* isn't > > > > >> sufficient > > > > >> > > for > > > > >> > > > RowData because, during migration, we need to: > > > > >> > > > > > > > >> > > > - Add null values for newly added fields > > > > >> > > > - Reorder fields based on field names in the new schema > > > > >> > > > - Recursively handle nested structures > > > > >> > > > - Apply different migration logic depending on the state > > type > > > > >> (e.g., > > > > >> > > > *ListState* uses *migrateElement*(), *MapState* uses > > > > >> > *migrateState*()) > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > The current approach: > > > > >> > > > > > > > >> > > > > > > > >> > > > *V value = > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > > > > > > > priorSerializer.deserialize(serializedOldValueInput);**newSerializer.serialize(value, > > > > >> > > > serializedMigratedValueOutput);* > > > > >> > > > > > > > >> > > > doesn’t offer enough control for these needs. > > > > >> > > > > > > > >> > > > The proposed *migrateState* and *migrateElement* methods > > > maintain > > > > >> > > backward > > > > >> > > > compatibility with default implementations, while enabling > > > RowData > > > > >> to > > > > >> > > > perform specialized migration logic without requiring > backend > > > > >> changes. > > > > >> > > > > > > > >> > > > I’ve updated the proposal document to include pseudo-code > > > examples > > > > >> of > > > > >> > > > *migrateState* and *migrateElement* in the > > > > >> *RowDataSerializerSnapshot* > > > > >> > > > class to illustrate this. Let me know if I missed anything. > > > > >> > > > > > > > >> > > > Best regards, > > > > >> > > > Weiqing > > > > >> > > > > > > > >> > > > On Sat, Apr 26, 2025 at 9:39 PM Zakelly Lan < > > > > zakelly....@gmail.com> > > > > >> > > wrote: > > > > >> > > > > > > > >> > > > > Hi, Weiqing > > > > >> > > > > > > > > >> > > > > Thanks for the FLIP! In general I'd +1 for schema > evolution > > > for > > > > >> > RowData > > > > >> > > > > types, which will enhance the user experience of SQL jobs. > > > > >> > > > > > > > > >> > > > > I have one questions for now: > > > > >> > > > > > > > > >> > > > > You suggested introducing new methods in > > > > `TypeSerializerSnapshot`, > > > > >> > but > > > > >> > > is > > > > >> > > > > it possible to leverage existing state migration > > procedure[1], > > > > >> which > > > > >> > > also > > > > >> > > > > performs deserialization and serialization with old and > new > > > > >> > serializer > > > > >> > > > > correspondingly. IIUC, all we need is to properly > implement > > > > >> > > > > `resolveSchemaCompatibility` for > > > `RowDataSerializerSnapshot`[2] > > > > >> since > > > > >> > > it > > > > >> > > > > will be invoked here[3]. No need for new methods, right? > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > [1] > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > > > > > > > https://github.com/apache/flink/blob/f8b3c4b9a8ce1c6a094fcc0f292faea4bad8806c/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/AbstractRocksDBState.java#L201-L205 > > > > >> > > > > [2] > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > > > > > > > https://github.com/apache/flink/blob/f8b3c4b9a8ce1c6a094fcc0f292faea4bad8806c/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/RowDataSerializer.java#L335 > > > > >> > > > > [3] > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > > > > > > > https://github.com/apache/flink/blob/f8b3c4b9a8ce1c6a094fcc0f292faea4bad8806c/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSerializerProvider.java#L312 > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > Best, > > > > >> > > > > Zakelly > > > > >> > > > > > > > > >> > > > > On Sat, Apr 26, 2025 at 1:47 PM Weiqing Yang < > > > > >> > yangweiqing...@gmail.com > > > > >> > > > > > > > >> > > > > wrote: > > > > >> > > > > > > > > >> > > > > > Hi all, > > > > >> > > > > > > > > > >> > > > > > I’d like to initiate a discussion about enhancing state > > > schema > > > > >> > > > evolution > > > > >> > > > > > support for RowData in Flink. > > > > >> > > > > > > > > > >> > > > > > *Motivation* > > > > >> > > > > > > > > > >> > > > > > Flink applications frequently need to evolve their state > > > > schema > > > > >> as > > > > >> > > > > business > > > > >> > > > > > requirements change. Currently, when users update a > Table > > > API > > > > or > > > > >> > SQL > > > > >> > > > job > > > > >> > > > > > with schema changes involving RowData types > (particularly > > > > nested > > > > >> > > > > > structures), they encounter serialization compatibility > > > errors > > > > >> > during > > > > >> > > > > state > > > > >> > > > > > restoration, causing job failures.The issue occurs > because > > > > >> existing > > > > >> > > > state > > > > >> > > > > > migration mechanisms don't properly handle RowData types > > > > during > > > > >> > > schema > > > > >> > > > > > evolution, preventing users from making > > backward-compatible > > > > >> changes > > > > >> > > > like: > > > > >> > > > > > > > > > >> > > > > > - > > > > >> > > > > > > > > > >> > > > > > Adding nullable fields to existing structures > > > > >> > > > > > - > > > > >> > > > > > > > > > >> > > > > > Reordering fields within a row while preserving field > > > names > > > > >> > > > > > - > > > > >> > > > > > > > > > >> > > > > > Evolving nested row structures > > > > >> > > > > > > > > > >> > > > > > This limitation impacts production applications using > > > Flink's > > > > >> Table > > > > >> > > > API, > > > > >> > > > > as > > > > >> > > > > > the RowData type is central to this interface. Users are > > > > forced > > > > >> to > > > > >> > > > choose > > > > >> > > > > > between maintaining outdated schemas or reprocessing all > > > state > > > > >> data > > > > >> > > > when > > > > >> > > > > > schema changes are required. > > > > >> > > > > > > > > > >> > > > > > Here’s the proposal document: Link > > > > >> > > > > > < > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > > > > > > > https://docs.google.com/document/d/1WtAxp-jAVTLMOfWNldLCAoK137P0ZCMxR8hOZGcMxuc/edit?tab=t.0 > > > > >> > > > > > > > > > > >> > > > > > Your feedback and ideas are welcome to refine this > > feature. > > > > >> > > > > > > > > > >> > > > > > Thanks, > > > > >> > > > > > Weiqing > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > > >> -- > > > > >> Best, > > > > >> Hangxiang. > > > > >> > > > > > > > > > > > > > > >