Hi Timo, +1 for the improvement.
Best, Godfrey Timo Walther <twal...@apache.org> 于2021年12月10日周五 20:37写道: > > Hi Wenlong, > > yes it will. Sorry for the confusion. This is a logical consequence of > the assumption: > > The JSON plan contains no implementation details (i.e. no classes) and > is fully declarative. > > I will add a remark. > > Thanks, > Timo > > > On 10.12.21 11:43, wenlong.lwl wrote: > > hi, Timo, thanks for the explanation. I totally agree with what you said. > > My actual question is: Will the version of an exec node be serialised in > > the Json Plan? In my understanding, it is not in the former design. If it > > is yes, my question is solved already. > > > > > > Best, > > Wenlong > > > > > > On Fri, 10 Dec 2021 at 18:15, Timo Walther <twal...@apache.org> wrote: > > > >> Hi Wenlong, > >> > >> also thought about adding a `flinkVersion` field per ExecNode. But this > >> is not necessary, because the `version` of the ExecNode has the same > >> purpose. > >> > >> The plan version just encodes that: > >> "plan has been updated in Flink 1.17" / "plan is entirely valid for > >> Flink 1.17" > >> > >> The ExecNode version maps to `minStateVersion` to verify state > >> compatibility. > >> > >> So even if the plan version is 1.17, some ExecNodes use state layout of > >> 1.15. > >> > >> It is totally fine to only update the ExecNode to version 2 and not 3 in > >> your example. > >> > >> Regards, > >> Timo > >> > >> > >> > >> On 10.12.21 06:02, wenlong.lwl wrote: > >>> Hi, Timo, thanks for updating the doc. > >>> > >>> I have a comment on plan migration: > >>> I think we may need to add a version field for every exec node when > >>> serialising. In earlier discussions, I think we have a conclusion that > >>> treating the version of plan as the version of node, but in this case it > >>> would be broken. > >>> Take the following example in FLIP into consideration, there is a bad > >> case: > >>> when in 1.17, we introduced an incompatible version 3 and dropped version > >>> 1, we can only update the version to 2, so the version should be per exec > >>> node. > >>> > >>> ExecNode version *1* is not supported anymore. Even though the state is > >>> actually compatible. The plan restore will fail with a helpful exception > >>> that forces users to perform plan migration. > >>> > >>> COMPILE PLAN '/mydir/plan_new.json' FROM '/mydir/plan_old.json'; > >>> > >>> The plan migration will safely replace the old version *1* with *2. The > >>> JSON plan flinkVersion changes to 1.17.* > >>> > >>> > >>> Best, > >>> > >>> Wenlong > >>> > >>> On Thu, 9 Dec 2021 at 18:36, Timo Walther <twal...@apache.org> wrote: > >>> > >>>> Hi Jing and Godfrey, > >>>> > >>>> I had another iteration over the document. There are two major changes: > >>>> > >>>> 1. Supported Flink Upgrade Versions > >>>> > >>>> I got the feedback via various channels that a step size of one minor > >>>> version is not very convenient. As you said, "because upgrading to a new > >>>> version is a time-consuming process". I rephrased this section: > >>>> > >>>> Upgrading usually involves work which is why many users perform this > >>>> task rarely (e.g. only once per year). Also skipping a versions is > >>>> common until a new feature has been introduced for which is it worth to > >>>> upgrade. We will support the upgrade to the most recent Flink version > >>>> from a set of previous versions. We aim to support upgrades from the > >>>> last 2-3 releases on a best-effort basis; maybe even more depending on > >>>> the maintenance overhead. However, in order to not grow the testing > >>>> matrix infinitely and to perform important refactoring if necessary, we > >>>> only guarantee upgrades with a step size of a single minor version (i.e. > >>>> a cascade of upgrades). > >>>> > >>>> 2. Annotation Design > >>>> > >>>> I also adopted the multiple annotations design for the previous > >>>> supportPlanFormat. So no array of versions anymore. I reworked the > >>>> section, please have a look with updated examples: > >>>> > >>>> > >>>> > >> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=191336489#FLIP190:SupportVersionUpgradesforTableAPI&SQLPrograms-ExecNodeTests > >>>> > >>>> I also got the feedback offline that `savepoint` might not be the right > >>>> terminology for the annotation. I changed that to minPlanVersion and > >>>> minStateVersion. > >>>> > >>>> Let me know what you think. > >>>> > >>>> Regards, > >>>> Timo > >>>> > >>>> > >>>> > >>>> On 09.12.21 08:44, Jing Zhang wrote: > >>>>> Hi Timo, > >>>>> Thanks a lot for driving this discussion. > >>>>> I believe it could solve many problems what we are suffering in > >>>> upgrading. > >>>>> > >>>>> I only have a little complain on the following point. > >>>>> > >>>>>> For simplification of the design, we assume that upgrades use a step > >>>> size > >>>>> of a single minor version. We don't guarantee skipping minor versions > >>>> (e.g. > >>>>> 1.11 to > >>>>> 1.14). > >>>>> > >>>>> In our internal production environment, we follow up with the > >> community's > >>>>> latest stable release version almost once a year because upgrading to a > >>>> new > >>>>> version is a time-consuming process. > >>>>> So we might missed 1~3 version after we upgrade to the latest version. > >>>> This > >>>>> might also appears in other company too. > >>>>> Could we guarantee FLIP-190 work if we skip minor versions less than > >>>>> specified threshold? > >>>>> Then we could know which version is good for us when prepare upgrading. > >>>>> > >>>>> Best, > >>>>> Jing Zhang > >>>>> > >>>>> godfrey he <godfre...@gmail.com> 于2021年12月8日周三 22:16写道: > >>>>> > >>>>>> Hi Timo, > >>>>>> > >>>>>> Thanks for the explanation, it's much clearer now. > >>>>>> > >>>>>> One thing I want to confirm about `supportedPlanFormat ` > >>>>>> and `supportedSavepointFormat `: > >>>>>> `supportedPlanFormat ` supports multiple versions, > >>>>>> while `supportedSavepointFormat ` supports only one version ? > >>>>>> A json plan can be deserialized by multiple versions > >>>>>> because default value will be set for new fields. > >>>>>> In theory, a Savepoint can be restored by more than one version > >>>>>> of the operators even if a state layout is changed, > >>>>>> such as deleting a whole state and starting job with > >>>>>> `allowNonRestoredState`=true. > >>>>>> I think this is a corner case, and it's hard to understand comparing > >>>>>> to `supportedPlanFormat ` supporting multiple versions. > >>>>>> So, for most cases, when the state layout is changed, the savepoint is > >>>>>> incompatible, > >>>>>> and `supportedSavepointFormat` and version need to be changed. > >>>>>> > >>>>>> I think we need a detail explanation about the annotations change > >> story > >>>> in > >>>>>> the java doc of `ExecNodeMetadata` class for all developers > >>>>>> (esp. those unfamiliar with this part). > >>>>>> > >>>>>> Best, > >>>>>> Godfrey > >>>>>> > >>>>>> Timo Walther <twal...@apache.org> 于2021年12月8日周三 下午4:57写道: > >>>>>>> > >>>>>>> Hi Wenlong, > >>>>>>> > >>>>>>> thanks for the feedback. Great that we reached consensus here. I will > >>>>>>> update the entire document with my previous example shortly. > >>>>>>> > >>>>>>> > if we don't update the version when plan format changes, we > >> can't > >>>>>>> find that the plan can't not be deserialized in 1.15 > >>>>>>> > >>>>>>> This should not be a problem as the entire plan file has a version as > >>>>>>> well. We should not allow reading a 1.16 plan in 1.15. We can throw a > >>>>>>> helpful exception early. > >>>>>>> > >>>>>>> Reading a 1.15 plan in 1.16 is possible until we drop the old > >>>>>>> `supportedPlanFormat` from one of used ExecNodes. Afterwards all > >>>>>>> `supportedPlanFormat` of ExecNodes must be equal or higher then the > >>>> plan > >>>>>>> version. > >>>>>>> > >>>>>>> Regards, > >>>>>>> Timo > >>>>>>> > >>>>>>> On 08.12.21 03:07, wenlong.lwl wrote: > >>>>>>>> Hi, Timo, +1 for multi metadata. > >>>>>>>> > >>>>>>>> The compatible change I mean in the last email is the slight state > >>>>>> change > >>>>>>>> example you gave, so we have got consensus on this actually, IMO. > >>>>>>>> > >>>>>>>> Another question based on the example you gave: > >>>>>>>> In the example "JSON node gets an additional property in 1.16", if > >> we > >>>>>> don't > >>>>>>>> update the version when plan format changes, we can't find that the > >>>>>> plan > >>>>>>>> can't not be deserialized in 1.15, although the savepoint state is > >>>>>>>> compatible. > >>>>>>>> The error message may be not so friendly if we just throw > >>>>>> deserialization > >>>>>>>> failure. > >>>>>>>> > >>>>>>>> On Tue, 7 Dec 2021 at 16:49, Timo Walther <twal...@apache.org> > >> wrote: > >>>>>>>> > >>>>>>>>> Hi Wenlong, > >>>>>>>>> > >>>>>>>>> > First, we add a newStateLayout because of some improvement > >> in > >>>>>> state, in > >>>>>>>>> > order to keep compatibility we may still keep the old state > >> for > >>>>>> the > >>>>>>>>> first > >>>>>>>>> > version. We need to update the version, so that we can > >> generate > >>>> a > >>>>>> new > >>>>>>>>> > version plan for the new job and keep the exec node > >> compatible > >>>>>> with > >>>>>>>>> the old > >>>>>>>>> > version plan. > >>>>>>>>> > >>>>>>>>> The problem that I see here for contributors is that the actual > >>>> update > >>>>>>>>> of a version is more complicated than just updating an integer > >> value. > >>>>>> It > >>>>>>>>> means copying a lot of ExecNode code for a change that happens > >>>> locally > >>>>>>>>> in an operator. Let's assume multiple ExecNodes use a similar > >>>>>> operator. > >>>>>>>>> Why do we need to update all ExecNode versions, if the operator > >>>> itself > >>>>>>>>> can deal with the incompatibility. The ExecNode version is meant > >> for > >>>>>>>>> topology changes or fundamental state changes. > >>>>>>>>> > >>>>>>>>> If we don't find consensus on this topic, I would at least vote for > >>>>>>>>> supporting multiple annotations for an ExecNode class. This way we > >>>>>> don't > >>>>>>>>> need to copy code but only add two ExecNode annotations with > >>>> different > >>>>>>>>> ExecNode versions. > >>>>>>>>> > >>>>>>>>> > Maybe we can add support for this case : > >>>>>>>>> > when an exec node is changed in 1.16, but is compatible with > >>>> 1.15, > >>>>>>>>> > we can use the node of 1.16 to deserialize the plan of 1.15. > >>>>>>>>> > >>>>>>>>> If the ExecNode is compatible, there is no reason to increase the > >>>>>>>>> ExecNode version. > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> I tried to come up with a reworked solution to make all parties > >>>> happy: > >>>>>>>>> > >>>>>>>>> 1. Let's assume the following annotations: > >>>>>>>>> > >>>>>>>>> supportedPlanFormat = [1.15] > >>>>>>>>> > >>>>>>>>> supportedSavepointFormat = 1.15 > >>>>>>>>> > >>>>>>>>> we drop `added` as it is equal to `supportedSavepointFormat` > >>>>>>>>> > >>>>>>>>> 2. Multiple annotations over ExecNodes are possible: > >>>>>>>>> > >>>>>>>>> // operator state changes > >>>>>>>>> > >>>>>>>>> // initial introduction in 1.15 > >>>>>>>>> @ExecNodeMetadata(name=A, version=1, supportedPlanFormat=1.15, > >>>>>>>>> supportedSavepointFormat=1.15) > >>>>>>>>> > >>>>>>>>> // state layout changed slightly in 1.16 > >>>>>>>>> // - operator migration is possible > >>>>>>>>> // - operator supports state of both versions and will perform > >>>>>> operator > >>>>>>>>> state migration > >>>>>>>>> // - new plans will get new ExecNode version > >>>>>>>>> @ExecNodeMetadata(name=A, version=1, supportedPlanFormat=1.15, > >>>>>>>>> supportedSavepointFormat=1.15) > >>>>>>>>> @ExecNodeMetadata(name=A, version=2, supportedPlanFormat=1.15, > >>>>>>>>> supportedSavepointFormat=1.16) > >>>>>>>>> > >>>>>>>>> // we force a plan migration in 1.17 > >>>>>>>>> // - we assume that all operator states have been migrated in the > >>>>>>>>> previous version > >>>>>>>>> // - we can safely replace the old version `1` with `2` and only > >> keep > >>>>>>>>> the new savepoint format > >>>>>>>>> @ExecNodeMetadata(name=A, version=2, supportedPlanFormat=1.15, > >>>>>>>>> supportedSavepointFormat=1.16) > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> // plan changes > >>>>>>>>> > >>>>>>>>> // initial introduction in 1.15 > >>>>>>>>> @ExecNodeMetadata(name=A, version=1, supportedPlanFormat=1.15, > >>>>>>>>> supportedSavepointFormat=1.15) > >>>>>>>>> > >>>>>>>>> // JSON node gets an additional property in 1.16 > >>>>>>>>> // e.g. { some-prop: 42 } -> { some-prop: 42, some-flag: false} > >>>>>>>>> // - ExecNode version does not change > >>>>>>>>> // - ExecNode version only changes when topology or state is > >> affected > >>>>>>>>> // - we support both JSON plan formats, the old and the newest one > >>>>>>>>> @ExecNodeMetadata(name=A, version=1, supportedPlanFormat=[1.15, > >>>> 1.16], > >>>>>>>>> supportedSavepointFormat=1.15) > >>>>>>>>> > >>>>>>>>> // we force a plan migration in 1.17 > >>>>>>>>> // - now we only support 1.16 plan format > >>>>>>>>> @ExecNodeMetadata(name=A, version=1, supportedPlanFormat=1.16, > >>>>>>>>> supportedSavepointFormat=1.15) > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> // topology change > >>>>>>>>> > >>>>>>>>> // initial introduction in 1.15 > >>>>>>>>> @ExecNodeMetadata(name=A, version=1, supportedPlanFormat=1.15, > >>>>>>>>> supportedSavepointFormat=1.15) > >>>>>>>>> > >>>>>>>>> // complete new class structure in 1.16 annotated with > >>>>>>>>> @ExecNodeMetadata(name=A, version=2, supportedPlanFormat=1.15, > >>>>>>>>> supportedSavepointFormat=1.16) > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> What do you think? > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> Regards, > >>>>>>>>> Timo > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> On 07.12.21 08:20, wenlong.lwl wrote: > >>>>>>>>>> Maybe we can add support for this case : > >>>>>>>>>> when an exec node is changed in 1.16, but is > >> compatible > >>>>>> with > >>>>>>>>> 1.15, > >>>>>>>>>> we can use the node of 1.16 to deserialize the plan of 1.15. > >>>>>>>>>> By this way, we don't need to fork the code if the change is > >>>>>> compatible, > >>>>>>>>>> and can avoid fork code frequently. > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> Best, > >>>>>>>>>> Wenlong > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> On Tue, 7 Dec 2021 at 15:08, wenlong.lwl <wenlong88....@gmail.com > >>> > >>>>>>>>> wrote: > >>>>>>>>>> > >>>>>>>>>>> hi, Timo, I would prefer to update the version every time we > >> change > >>>>>> the > >>>>>>>>>>> state layer too. > >>>>>>>>>>> > >>>>>>>>>>> It could be possible that we change the exec node in 2 steps: > >>>>>>>>>>> First, we add a newStateLayout because of some improvement in > >>>>>> state, in > >>>>>>>>>>> order to keep compatibility we may still keep the old state for > >> the > >>>>>>>>> first > >>>>>>>>>>> version. We need to update the version, so that we can generate a > >>>>>> new > >>>>>>>>>>> version plan for the new job and keep the exec node compatible > >> with > >>>>>> the > >>>>>>>>> old > >>>>>>>>>>> version plan. > >>>>>>>>>>> After some versions, we may remove the old version state layout > >> and > >>>>>>>>> clean > >>>>>>>>>>> up the deprecated code. We still need to update the version, so > >>>>>> that we > >>>>>>>>> can > >>>>>>>>>>> verify that we are compatible with the plan after the first > >> change, > >>>>>> but > >>>>>>>>> not > >>>>>>>>>>> compatible with the plan earlier. > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> Best, > >>>>>>>>>>> Wenlong > >>>>>>>>>>> > >>>>>>>>>>> On Mon, 6 Dec 2021 at 21:27, Timo Walther <twal...@apache.org> > >>>>>> wrote: > >>>>>>>>>>> > >>>>>>>>>>>> Hi Godfrey, > >>>>>>>>>>>> > >>>>>>>>>>>> > design makes thing more complex. > >>>>>>>>>>>> > >>>>>>>>>>>> Yes, the design might be a bit more complex. But operator > >>>>>> migration is > >>>>>>>>>>>> way easier than ExecNode migration at a later point in time for > >>>>>> code > >>>>>>>>>>>> maintenance. We know that ExecNodes can become pretty complex. > >>>> Even > >>>>>>>>>>>> though we have put a lot of code into `CommonXXExecNode` it will > >>>>>> be a > >>>>>>>>>>>> lot of work to maintain multiple versions of ExecNodes. If we > >> can > >>>>>> avoid > >>>>>>>>>>>> this with operator state migration, this should always be > >>>> preferred > >>>>>>>>> over > >>>>>>>>>>>> a new ExecNode version. > >>>>>>>>>>>> > >>>>>>>>>>>> I'm aware that operator state migration might only be important > >>>> for > >>>>>>>>>>>> roughly 10 % of all changes. A new ExecNode version will be used > >>>>>> for > >>>>>>>>> 90% > >>>>>>>>>>>> of all changes. > >>>>>>>>>>>> > >>>>>>>>>>>> > If there are multiple state layouts, which layout the > >>>> ExecNode > >>>>>>>>> should > >>>>>>>>>>>> use? > >>>>>>>>>>>> > >>>>>>>>>>>> It is not the responsibility of the ExecNode to decide this but > >>>> the > >>>>>>>>>>>> operator. Something like: > >>>>>>>>>>>> > >>>>>>>>>>>> class X extends ProcessFunction { > >>>>>>>>>>>> ValueState<A> oldStateLayout; > >>>>>>>>>>>> ValueState<B> newStateLayout; > >>>>>>>>>>>> > >>>>>>>>>>>> open() { > >>>>>>>>>>>> if (oldStateLayout.get() != null) { > >>>>>>>>>>>> performOperatorMigration(); > >>>>>>>>>>>> } > >>>>>>>>>>>> useNewStateLayout(); > >>>>>>>>>>>> } > >>>>>>>>>>>> } > >>>>>>>>>>>> > >>>>>>>>>>>> Operator migration is meant for smaller "more local" changes > >>>>>> without > >>>>>>>>>>>> touching the ExecNode layer. The CEP library and DataStream API > >>>>>> sources > >>>>>>>>>>>> are performing operator migration for years already. > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> > `supportedPlanChanges ` and `supportedSavepointChanges ` > >> are > >>>>>> a bit > >>>>>>>>>>>> obscure. > >>>>>>>>>>>> > >>>>>>>>>>>> Let me try to come up with more examples why I think both > >>>>>> annotation > >>>>>>>>>>>> make sense and are esp. important *for test coverage*. > >>>>>>>>>>>> > >>>>>>>>>>>> supportedPlanChanges: > >>>>>>>>>>>> > >>>>>>>>>>>> Let's assume we have some JSON in Flink 1.15: > >>>>>>>>>>>> > >>>>>>>>>>>> { > >>>>>>>>>>>> some-prop: 42 > >>>>>>>>>>>> } > >>>>>>>>>>>> > >>>>>>>>>>>> And we want to extend the JSON in Flink 1.16: > >>>>>>>>>>>> > >>>>>>>>>>>> { > >>>>>>>>>>>> some-prop: 42, > >>>>>>>>>>>> some-flag: false > >>>>>>>>>>>> } > >>>>>>>>>>>> > >>>>>>>>>>>> Maybe we don't need to increase the ExecNode version but only > >>>>>> ensure > >>>>>>>>>>>> that the flag is set to `false` by default for the older > >> versions. > >>>>>>>>>>>> > >>>>>>>>>>>> We need a location to track changes and document the changelog. > >>>>>> With > >>>>>>>>> the > >>>>>>>>>>>> help of the annotation supportedPlanChanges = [1.15, 1.16] we > >> can > >>>>>>>>> verify > >>>>>>>>>>>> that we have tests for both JSON formats. > >>>>>>>>>>>> > >>>>>>>>>>>> And once we decide to drop the 1.15 format, we enforce plan > >>>>>> migration > >>>>>>>>>>>> and fill-in the default value `false` into the old plans and > >> bump > >>>>>> their > >>>>>>>>>>>> JSON plan version to 1.16 or higher. > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> > once the state layout is changed, the ExecNode version > >> needs > >>>>>> also > >>>>>>>>> be > >>>>>>>>>>>> updated > >>>>>>>>>>>> > >>>>>>>>>>>> This will still be the majority of cases. But if we can avoid > >>>>>> this, we > >>>>>>>>>>>> should do it for not having too much duplicate code to maintain. > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> Thanks, > >>>>>>>>>>>> Timo > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> On 06.12.21 09:58, godfrey he wrote: > >>>>>>>>>>>>> Hi, Timo, > >>>>>>>>>>>>> > >>>>>>>>>>>>> Thanks for the detailed explanation. > >>>>>>>>>>>>> > >>>>>>>>>>>>>> We change an operator state of B in Flink 1.16. We perform the > >>>>>> change > >>>>>>>>>>>> in the operator of B in a way to support both state layouts. > >> Thus, > >>>>>> no > >>>>>>>>> need > >>>>>>>>>>>> for a new ExecNode version. > >>>>>>>>>>>>> > >>>>>>>>>>>>> I think this design makes thing more complex. > >>>>>>>>>>>>> 1. If there are multiple state layouts, which layout the > >> ExecNode > >>>>>>>>>>>> should use ? > >>>>>>>>>>>>> It increases the cost of understanding for developers > >> (especially > >>>>>> for > >>>>>>>>>>>>> Flink newer), > >>>>>>>>>>>>> making them prone to mistakes. > >>>>>>>>>>>>> 2. `supportedPlanChanges ` and `supportedSavepointChanges ` > >> are a > >>>>>> bit > >>>>>>>>>>>> obscure. > >>>>>>>>>>>>> > >>>>>>>>>>>>> The purpose of ExecNode annotations are not only to support > >>>>>> powerful > >>>>>>>>>>>> validation, > >>>>>>>>>>>>> but more importantly to make it easy for developers to > >> understand > >>>>>>>>>>>>> to ensure that every modification is easy and state compatible. > >>>>>>>>>>>>> > >>>>>>>>>>>>> I prefer, once the state layout is changed, the ExecNode > >> version > >>>>>> needs > >>>>>>>>>>>>> also be updated. > >>>>>>>>>>>>> which could make thing simple. How about > >>>>>>>>>>>>> rename `supportedPlanChanges ` to `planCompatibleVersion` > >>>>>>>>>>>>> (which means the plan is compatible with the plan generated by > >>>> the > >>>>>>>>>>>>> given version node) > >>>>>>>>>>>>> and rename `supportedSavepointChanges` to > >>>>>>>>> `savepointCompatibleVersion > >>>>>>>>>>>> ` > >>>>>>>>>>>>> (which means the state is compatible with the state generated > >> by > >>>>>> the > >>>>>>>>>>>>> given version node) ? > >>>>>>>>>>>>> The names also indicate that only one version value can be set. > >>>>>>>>>>>>> > >>>>>>>>>>>>> WDYT? > >>>>>>>>>>>>> > >>>>>>>>>>>>> Best, > >>>>>>>>>>>>> Godfrey > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> Timo Walther <twal...@apache.org> 于2021年12月2日周四 下午11:42写道: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Response to Marios's feedback: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > there should be some good logging in place when the > >>>>>> upgrade is > >>>>>>>>>>>> taking > >>>>>>>>>>>>>> place > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Yes, I agree. I added this part to the FLIP. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > config option instead that doesn't provide the > >>>> flexibility > >>>>>> to > >>>>>>>>>>>>>> overwrite certain plans > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> One can set the config option also around sections of the > >>>>>>>>>>>>>> multi-statement SQL script. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> SET 'table.plan.force-recompile'='true'; > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> COMPILE ... > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> SET 'table.plan.force-recompile'='false'; > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> But the question is why a user wants to run COMPILE multiple > >>>>>> times. > >>>>>>>>> If > >>>>>>>>>>>>>> it is during development, then running EXECUTE (or just the > >>>>>> statement > >>>>>>>>>>>>>> itself) without calling COMPILE should be sufficient. The file > >>>>>> can > >>>>>>>>> also > >>>>>>>>>>>>>> manually be deleted if necessary. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> What do you think? > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Regards, > >>>>>>>>>>>>>> Timo > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> On 02.12.21 16:09, Timo Walther wrote: > >>>>>>>>>>>>>>> Hi Till, > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Yes, you might have to. But not a new plan from the SQL query > >>>>>> but a > >>>>>>>>>>>>>>> migration from the old plan to the new plan. This will not > >>>>>> happen > >>>>>>>>>>>> often. > >>>>>>>>>>>>>>> But we need a way to evolve the format of the JSON plan > >> itself. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Maybe this confuses a bit, so let me clarify it again: Mostly > >>>>>>>>> ExecNode > >>>>>>>>>>>>>>> versions and operator state layouts will evolve. Not the plan > >>>>>> files, > >>>>>>>>>>>>>>> those will be pretty stable. But also not infinitely. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Regards, > >>>>>>>>>>>>>>> Timo > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> On 02.12.21 16:01, Till Rohrmann wrote: > >>>>>>>>>>>>>>>> Then for migrating from Flink 1.10 to 1.12, I might have to > >>>>>> create > >>>>>>>>> a > >>>>>>>>>>>> new > >>>>>>>>>>>>>>>> plan using Flink 1.11 in order to migrate from Flink 1.11 to > >>>>>> 1.12, > >>>>>>>>>>>> right? > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Cheers, > >>>>>>>>>>>>>>>> Till > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> On Thu, Dec 2, 2021 at 3:39 PM Timo Walther < > >>>>>> twal...@apache.org> > >>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Response to Till's feedback: > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > compiled plan won't be changed after being written > >>>>>> initially > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> This is not entirely correct. We give guarantees for > >> keeping > >>>>>> the > >>>>>>>>>>>> query > >>>>>>>>>>>>>>>>> up and running. We reserve us the right to force plan > >>>>>> migrations. > >>>>>>>>> In > >>>>>>>>>>>>>>>>> this case, the plan might not be created from the SQL > >>>>>> statement > >>>>>>>>> but > >>>>>>>>>>>> from > >>>>>>>>>>>>>>>>> the old plan. I have added an example in section 10.1.1. In > >>>>>>>>> general, > >>>>>>>>>>>>>>>>> both persisted entities "plan" and "savepoint" can evolve > >>>>>>>>>>>> independently > >>>>>>>>>>>>>>>>> from each other. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>>>>>> Timo > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> On 02.12.21 15:10, Timo Walther wrote: > >>>>>>>>>>>>>>>>>> Response to Godfrey's feedback: > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > "EXPLAIN PLAN EXECUTE STATEMENT SET BEGIN ... > >> END" > >>>> is > >>>>>>>>> missing. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Thanks for the hint. I added a dedicated section 7.1.3. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > it's hard to maintain the supported versions for > >>>>>>>>>>>>>>>>>> "supportedPlanChanges" and "supportedSavepointChanges" > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Actually, I think we are mostly on the same page. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> The annotation does not need to be updated for every Flink > >>>>>>>>>>>> version. As > >>>>>>>>>>>>>>>>>> the name suggests it is about "Changes" (in other words: > >>>>>>>>>>>>>>>>>> incompatibilities) that require some kind of migration. > >>>>>> Either > >>>>>>>>> plan > >>>>>>>>>>>>>>>>>> migration (= PlanChanges) or savepoint migration > >>>>>>>>>>>> (=SavepointChanges, > >>>>>>>>>>>>>>>>>> using operator migration or savepoint migration). > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Let's assume we introduced two ExecNodes A and B in Flink > >>>>>> 1.15. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> The annotations are: > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> @ExecNodeMetadata(name=A, supportedPlanChanges=1.15, > >>>>>>>>>>>>>>>>>> supportedSavepointChanges=1.15) > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> @ExecNodeMetadata(name=B, supportedPlanChanges=1.15, > >>>>>>>>>>>>>>>>>> supportedSavepointChanges=1.15) > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> We change an operator state of B in Flink 1.16. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> We perform the change in the operator of B in a way to > >>>>>> support > >>>>>>>>> both > >>>>>>>>>>>>>>>>>> state layouts. Thus, no need for a new ExecNode version. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> The annotations in 1.16 are: > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> @ExecNodeMetadata(name=A, supportedPlanChanges=1.15, > >>>>>>>>>>>>>>>>>> supportedSavepointChanges=1.15) > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> @ExecNodeMetadata(name=B, supportedPlanChanges=1.15, > >>>>>>>>>>>>>>>>>> supportedSavepointChanges=1.15, 1.16) > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> So the versions in the annotations are "start version"s. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> I don't think we need end versions? End version would mean > >>>>>> that > >>>>>>>>> we > >>>>>>>>>>>> drop > >>>>>>>>>>>>>>>>>> the ExecNode from the code base? > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Please check the section 10.1.1 again. I added a more > >>>> complex > >>>>>>>>>>>> example. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>>>>>>> Timo > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> On 01.12.21 16:29, Timo Walther wrote: > >>>>>>>>>>>>>>>>>>> Response to Francesco's feedback: > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > *Proposed changes #6*: Other than defining this > >>>> rule > >>>>>> of > >>>>>>>>>>>> thumb, we > >>>>>>>>>>>>>>>>>>> must also make sure that compiling plans with these > >> objects > >>>>>> that > >>>>>>>>>>>>>>>>>>> cannot be serialized in the plan must fail hard > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Yes, I totally agree. We will fail hard with a helpful > >>>>>>>>> exception. > >>>>>>>>>>>> Any > >>>>>>>>>>>>>>>>>>> mistake e.g. using a inline object in Table API or an > >>>>>> invalid > >>>>>>>>>>>>>>>>>>> DataStream API source without uid should immediately > >> fail a > >>>>>> plan > >>>>>>>>>>>>>>>>>>> compilation step. I added a remark to the FLIP again. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > What worries me is breaking changes, in > >> particular > >>>>>>>>>>>> behavioural > >>>>>>>>>>>>>>>>>>> changes that might happen in connectors/formats > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Breaking changes in connectors and formats need to be > >>>>>> encoded in > >>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>> options. I could also imagine to versioning in the > >> factory > >>>>>>>>>>>> identifier > >>>>>>>>>>>>>>>>>>> `connector=kafka` and `connector=kafka-2`. If this is > >>>>>> necessary. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> After thinking about your question again, I think we will > >>>>>> also > >>>>>>>>>>>> need > >>>>>>>>>>>>>>>>>>> the same testing infrastructure for our connectors and > >>>>>> formats. > >>>>>>>>>>>> Esp. > >>>>>>>>>>>>>>>>>>> restore tests and completeness test. I updated the > >> document > >>>>>>>>>>>>>>>>>>> accordingly. Also I added a way to generate UIDs for > >>>>>> DataStream > >>>>>>>>>>>> API > >>>>>>>>>>>>>>>>>>> providers. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > *Functions:* Are we talking about the function > >> name > >>>>>> or the > >>>>>>>>>>>>>>>>>>> function > >>>>>>>>>>>>>>>>>>> complete signature? > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> For catalog functions, the identifier contains catalog > >> name > >>>>>> and > >>>>>>>>>>>>>>>>>>> database name. For system functions, identifier contains > >>>>>> only a > >>>>>>>>>>>> name > >>>>>>>>>>>>>>>>>>> which make function name and identifier identical. I > >>>>>> reworked > >>>>>>>>> the > >>>>>>>>>>>>>>>>>>> section again and also fixed some of the naming conflicts > >>>>>> you > >>>>>>>>>>>>>>>>>>> mentioned. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > we should perhaps use a logically defined > >> unique id > >>>>>> like > >>>>>>>>>>>>>>>>>>> /bigIntToTimestamp/ > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> I added a concrete example for the resolution and > >>>>>> restoration. > >>>>>>>>> The > >>>>>>>>>>>>>>>>>>> unique id is composed of name + version. Internally, this > >>>> is > >>>>>>>>>>>>>>>>>>> represented as `$TO_TIMESTAMP_LTZ$1`. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > I think we should rather keep JSON out of the > >>>> concept > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Sounds ok to me. In SQL we also just call it "plan". I > >> will > >>>>>>>>>>>> change the > >>>>>>>>>>>>>>>>>>> file sections. But would suggest to keep the > >> fromJsonString > >>>>>>>>>>>> method. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > write it back in the original plan file > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> I updated the terminology section for what we consider an > >>>>>>>>>>>> "upgrade". > >>>>>>>>>>>>>>>>>>> We might need to update the orginal plan file. This is > >>>>>> already > >>>>>>>>>>>>>>>>>>> considered in the COMPILE PLAN ... FROM ... even though > >>>>>> this is > >>>>>>>>>>>> future > >>>>>>>>>>>>>>>>>>> work. Also savepoint migration. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Thanks for all the feedback! > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Timo > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> On 30.11.21 14:28, Timo Walther wrote: > >>>>>>>>>>>>>>>>>>>> Response to Wenlongs's feedback: > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> > I would prefer not to provide such a shortcut, > >> let > >>>>>> users > >>>>>>>>> use > >>>>>>>>>>>>>>>>>>>> COMPILE PLAN IF NOT EXISTS and EXECUTE explicitly, which > >>>>>> can be > >>>>>>>>>>>>>>>>>>>> understood by new users even without inferring the docs. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> I would like to hear more opinions on this topic. > >>>>>> Personally, I > >>>>>>>>>>>> find > >>>>>>>>>>>>>>>>>>>> a combined statement very useful. Not only for quicker > >>>>>>>>>>>> development > >>>>>>>>>>>>>>>>>>>> and debugging but also for readability. It helps in > >>>>>> keeping the > >>>>>>>>>>>> JSON > >>>>>>>>>>>>>>>>>>>> path and the query close to each other in order to know > >>>> the > >>>>>>>>>>>> origin of > >>>>>>>>>>>>>>>>>>>> the plan. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> > but the plan and SQL are not matched. The > >> result > >>>>>> would be > >>>>>>>>>>>> quite > >>>>>>>>>>>>>>>>>>>> confusing if we still execute the plan directly, we may > >>>>>> need to > >>>>>>>>>>>> add a > >>>>>>>>>>>>>>>>>>>> validation. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> You are right that there could be a mismatch. But we > >> have > >>>> a > >>>>>>>>>>>> similar > >>>>>>>>>>>>>>>>>>>> problem when executing CREATE TABLE IF NOT EXISTS. The > >>>>>> schema > >>>>>>>>> or > >>>>>>>>>>>>>>>>>>>> options of a table could have changed completely in the > >>>>>> catalog > >>>>>>>>>>>> but > >>>>>>>>>>>>>>>>>>>> the CREATE TABLE IF NOT EXISTS is not executed again. > >> So a > >>>>>>>>>>>> mismatch > >>>>>>>>>>>>>>>>>>>> could also occur there. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Regards, > >>>>>>>>>>>>>>>>>>>> Timo > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> On 30.11.21 14:17, Timo Walther wrote: > >>>>>>>>>>>>>>>>>>>>> Hi everyone, > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> thanks for the feedback so far. Let me answer each > >> email > >>>>>>>>>>>>>>>>>>>>> indvidually. > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> I will start with a response to Ingo's feedback: > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> > Will the JSON plan's schema be considered an > >> API? > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> No, not in the first version. This is explicitly > >>>>>> mentioned in > >>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>> `General JSON Plan Assumptions`. I tried to improve the > >>>>>>>>> section > >>>>>>>>>>>> once > >>>>>>>>>>>>>>>>>>>>> more to make it clearer. However, the JSON plan is > >>>>>> definitely > >>>>>>>>>>>> stable > >>>>>>>>>>>>>>>>>>>>> per minor version. And since the plan is versioned by > >>>>>> Flink > >>>>>>>>>>>> version, > >>>>>>>>>>>>>>>>>>>>> external tooling could be build around it. We might > >> make > >>>>>> it > >>>>>>>>>>>> public > >>>>>>>>>>>>>>>>>>>>> API once the design has settled. > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> > Given that upgrades across multiple versions > >> at > >>>>>> once are > >>>>>>>>>>>>>>>>>>>>> unsupported, do we verify this somehow? > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> Good question. I extended the `General JSON Plan > >>>>>> Assumptions`. > >>>>>>>>>>>> Now > >>>>>>>>>>>>>>>>>>>>> yes: the Flink version is part of the JSON plan and > >> will > >>>>>> be > >>>>>>>>>>>> verified > >>>>>>>>>>>>>>>>>>>>> during restore. But keep in mind that we might support > >>>>>> more > >>>>>>>>> that > >>>>>>>>>>>>>>>>>>>>> just the last version at least until the JSON plan has > >>>>>> been > >>>>>>>>>>>>>>>>>>>>> migrated. > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> Regards, > >>>>>>>>>>>>>>>>>>>>> Timo > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> On 30.11.21 09:39, Marios Trivyzas wrote: > >>>>>>>>>>>>>>>>>>>>>> I have a question regarding the `COMPILE PLAN > >> OVEWRITE`. > >>>>>> If > >>>>>>>>> we > >>>>>>>>>>>>>>>>>>>>>> choose to go > >>>>>>>>>>>>>>>>>>>>>> with the config option instead, > >>>>>>>>>>>>>>>>>>>>>> that doesn't provide the flexibility to overwrite > >>>> certain > >>>>>>>>>>>> plans but > >>>>>>>>>>>>>>>>>>>>>> not > >>>>>>>>>>>>>>>>>>>>>> others, since the config applies globally, isn't that > >>>>>>>>>>>>>>>>>>>>>> something to consider? > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> On Mon, Nov 29, 2021 at 10:15 AM Marios Trivyzas < > >>>>>>>>>>>> mat...@gmail.com> > >>>>>>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> Hi Timo! > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> Thanks a lot for taking all that time and effort to > >> put > >>>>>>>>>>>> together > >>>>>>>>>>>>>>>>> this > >>>>>>>>>>>>>>>>>>>>>>> proposal! > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> Regarding: > >>>>>>>>>>>>>>>>>>>>>>>> For simplification of the design, we assume that > >>>>>> upgrades > >>>>>>>>>>>> use a > >>>>>>>>>>>>>>>>>>>>>>>> step size > >>>>>>>>>>>>>>>>>>>>>>> of a single > >>>>>>>>>>>>>>>>>>>>>>> minor version. We don't guarantee skipping minor > >>>>>> versions > >>>>>>>>>>>> (e.g. > >>>>>>>>>>>>>>>>>>>>>>> 1.11 to > >>>>>>>>>>>>>>>>>>>>>>> 1.14). > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> I think that for this first step we should make it > >>>>>>>>> absolutely > >>>>>>>>>>>>>>>>>>>>>>> clear to the > >>>>>>>>>>>>>>>>>>>>>>> users that they would need to go through all > >>>>>> intermediate > >>>>>>>>>>>> versions > >>>>>>>>>>>>>>>>>>>>>>> to end up with the target version they wish. If we > >> are > >>>>>> to > >>>>>>>>>>>> support > >>>>>>>>>>>>>>>>>>>>>>> skipping > >>>>>>>>>>>>>>>>>>>>>>> versions in the future, i.e. upgrade from 1.14 to > >> 1.17, > >>>>>> this > >>>>>>>>>>>> means > >>>>>>>>>>>>>>>>>>>>>>> that we need to have a testing infrastructure in > >> place > >>>>>> that > >>>>>>>>>>>> would > >>>>>>>>>>>>>>>>>>>>>>> test all > >>>>>>>>>>>>>>>>>>>>>>> possible combinations of version upgrades, i.e. from > >>>>>> 1.14 to > >>>>>>>>>>>> 1.15, > >>>>>>>>>>>>>>>>>>>>>>> from 1.14 to 1.16 and so forth, while still testing > >> and > >>>>>> of > >>>>>>>>>>>> course > >>>>>>>>>>>>>>>>>>>>>>> supporting all the upgrades from the previous minor > >>>>>> version. > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> I like a lot the idea of introducing HINTS to define > >>>>>> some > >>>>>>>>>>>>>>>>>>>>>>> behaviour in the > >>>>>>>>>>>>>>>>>>>>>>> programs! > >>>>>>>>>>>>>>>>>>>>>>> - the hints live together with the sql statements and > >>>>>>>>>>>> consequently > >>>>>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>> (JSON) plans. > >>>>>>>>>>>>>>>>>>>>>>> - If multiple queries are involved in a program, each > >>>>>> one of > >>>>>>>>>>>> them > >>>>>>>>>>>>>>>>> can > >>>>>>>>>>>>>>>>>>>>>>> define its own config (regarding plan optimisation, > >> not > >>>>>> null > >>>>>>>>>>>>>>>>>>>>>>> enforcement, > >>>>>>>>>>>>>>>>>>>>>>> etc) > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> I agree with Francesco on his argument regarding the > >>>>>> *JSON* > >>>>>>>>>>>>>>>>>>>>>>> plan. I > >>>>>>>>>>>>>>>>>>>>>>> believe we should already provide flexibility here, > >>>>>> since > >>>>>>>>> (who > >>>>>>>>>>>>>>>>>>>>>>> knows) in > >>>>>>>>>>>>>>>>>>>>>>> the future > >>>>>>>>>>>>>>>>>>>>>>> a JSON plan might not fulfil the desired > >> functionality. > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> I also agree that we need some very obvious way (i.e. > >>>>>> not > >>>>>>>>> log > >>>>>>>>>>>>>>>>>>>>>>> entry) to > >>>>>>>>>>>>>>>>>>>>>>> show the users that their program doesn't support > >>>>>> version > >>>>>>>>>>>>>>>>>>>>>>> upgrades, and > >>>>>>>>>>>>>>>>>>>>>>> prevent them from being negatively surprised in the > >>>>>> future, > >>>>>>>>>>>> when > >>>>>>>>>>>>>>>>>>>>>>> trying to > >>>>>>>>>>>>>>>>>>>>>>> upgrade their production pipelines. > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> This is an implementation detail, but I'd like to add > >>>>>> that > >>>>>>>>>>>> there > >>>>>>>>>>>>>>>>>>>>>>> should be > >>>>>>>>>>>>>>>>>>>>>>> some good logging in place when the upgrade is taking > >>>>>> place, > >>>>>>>>>>>> to be > >>>>>>>>>>>>>>>>>>>>>>> able to track every restoration action, and help > >> debug > >>>>>> any > >>>>>>>>>>>>>>>>>>>>>>> potential > >>>>>>>>>>>>>>>>>>>>>>> issues arising from that. > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> On Fri, Nov 26, 2021 at 2:54 PM Till Rohrmann > >>>>>>>>>>>>>>>>>>>>>>> <trohrm...@apache.org > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> Thanks for writing this FLIP Timo. I think this will > >>>>>> be a > >>>>>>>>>>>> very > >>>>>>>>>>>>>>>>>>>>>>>> important > >>>>>>>>>>>>>>>>>>>>>>>> improvement for Flink and our SQL user :-) > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> Similar to Francesco I would like to understand the > >>>>>>>>> statement > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> For simplification of the design, we assume that > >>>>>> upgrades > >>>>>>>>>>>> use a > >>>>>>>>>>>>>>>>>>>>>>>>> step > >>>>>>>>>>>>>>>>>>>>>>>> size > >>>>>>>>>>>>>>>>>>>>>>>> of a single > >>>>>>>>>>>>>>>>>>>>>>>> minor version. We don't guarantee skipping minor > >>>>>> versions > >>>>>>>>>>>> (e.g. > >>>>>>>>>>>>>>>>>>>>>>>> 1.11 to > >>>>>>>>>>>>>>>>>>>>>>>> 1.14). > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> a bit better. Is it because Flink does not guarantee > >>>>>> that a > >>>>>>>>>>>>>>>>>>>>>>>> savepoint > >>>>>>>>>>>>>>>>>>>>>>>> created by version 1.x can be directly recovered by > >>>>>> version > >>>>>>>>>>>> 1.y > >>>>>>>>>>>>>>>>>>>>>>>> with x + 1 > >>>>>>>>>>>>>>>>>>>>>>>> < y but users might have to go through a cascade of > >>>>>>>>> upgrades? > >>>>>>>>>>>>>>>>>>>>>>>> From how I > >>>>>>>>>>>>>>>>>>>>>>>> understand your proposal, the compiled plan won't be > >>>>>>>>> changed > >>>>>>>>>>>>>>>>>>>>>>>> after being > >>>>>>>>>>>>>>>>>>>>>>>> written initially. Hence, I would assume that for > >> the > >>>>>> plan > >>>>>>>>>>>> alone > >>>>>>>>>>>>>>>>>>>>>>>> Flink > >>>>>>>>>>>>>>>>>>>>>>>> will > >>>>>>>>>>>>>>>>>>>>>>>> have to give backwards compatibility guarantees for > >>>> all > >>>>>>>>>>>> versions. > >>>>>>>>>>>>>>>>>>>>>>>> Am I > >>>>>>>>>>>>>>>>>>>>>>>> understanding this part correctly? > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> Cheers, > >>>>>>>>>>>>>>>>>>>>>>>> Till > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> On Thu, Nov 25, 2021 at 4:55 PM Francesco Guardiani > >> < > >>>>>>>>>>>>>>>>>>>>>>>> france...@ververica.com> > >>>>>>>>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> Hi Timo, > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> Thanks for putting this amazing work together, I > >> have > >>>>>> some > >>>>>>>>>>>>>>>>>>>>>>>>> considerations/questions > >>>>>>>>>>>>>>>>>>>>>>>>> about the FLIP: > >>>>>>>>>>>>>>>>>>>>>>>>> *Proposed changes #6*: Other than defining this > >> rule > >>>>>> of > >>>>>>>>>>>> thumb, > >>>>>>>>>>>>>>>>>>>>>>>>> we must > >>>>>>>>>>>>>>>>>>>>>>>>> also make sure > >>>>>>>>>>>>>>>>>>>>>>>>> that compiling plans with these objects that cannot > >>>> be > >>>>>>>>>>>>>>>>>>>>>>>>> serialized in the > >>>>>>>>>>>>>>>>>>>>>>>>> plan must fail hard, > >>>>>>>>>>>>>>>>>>>>>>>>> so users don't bite themselves with such issues, or > >>>> at > >>>>>>>>>>>> least we > >>>>>>>>>>>>>>>>>>>>>>>>> need to > >>>>>>>>>>>>>>>>>>>>>>>>> output warning > >>>>>>>>>>>>>>>>>>>>>>>>> logs. In general, whenever the user is trying to > >> use > >>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>>> CompiledPlan > >>>>>>>>>>>>>>>>>>>>>>>> APIs > >>>>>>>>>>>>>>>>>>>>>>>>> and at the same > >>>>>>>>>>>>>>>>>>>>>>>>> time, they're trying to do something "illegal" for > >>>> the > >>>>>>>>>>>> plan, we > >>>>>>>>>>>>>>>>>>>>>>>>> should > >>>>>>>>>>>>>>>>>>>>>>>>> immediately either > >>>>>>>>>>>>>>>>>>>>>>>>> log or fail depending on the issue, in order to > >> avoid > >>>>>> any > >>>>>>>>>>>>>>>>>>>>>>>>> surprises once > >>>>>>>>>>>>>>>>>>>>>>>>> the user upgrades. > >>>>>>>>>>>>>>>>>>>>>>>>> I would also say the same for things like > >> registering > >>>>>> a > >>>>>>>>>>>>>>>>>>>>>>>>> function, > >>>>>>>>>>>>>>>>>>>>>>>>> registering a DataStream, > >>>>>>>>>>>>>>>>>>>>>>>>> and for every other thing which won't end up in the > >>>>>> plan, > >>>>>>>>> we > >>>>>>>>>>>>>>>>>>>>>>>>> should log > >>>>>>>>>>>>>>>>>>>>>>>>> such info to the > >>>>>>>>>>>>>>>>>>>>>>>>> user by default. > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> *General JSON Plan Assumptions #9:* When thinking > >> to > >>>>>>>>>>>> connectors > >>>>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>>>>>>>>> formats, I think > >>>>>>>>>>>>>>>>>>>>>>>>> it's reasonable to assume and keep out of the > >> feature > >>>>>>>>> design > >>>>>>>>>>>>>>>>>>>>>>>>> that no > >>>>>>>>>>>>>>>>>>>>>>>>> feature/ability can > >>>>>>>>>>>>>>>>>>>>>>>>> deleted from a connector/format. I also don't think > >>>>>> new > >>>>>>>>>>>>>>>>>>>>>>>> features/abilities > >>>>>>>>>>>>>>>>>>>>>>>>> can influence > >>>>>>>>>>>>>>>>>>>>>>>>> this FLIP as well, given the plan is static, so if > >>>> for > >>>>>>>>>>>> example, > >>>>>>>>>>>>>>>>>>>>>>>>> MyCoolTableSink in the next > >>>>>>>>>>>>>>>>>>>>>>>>> flink version implements > >> SupportsProjectionsPushDown, > >>>>>> then > >>>>>>>>>>>> it > >>>>>>>>>>>>>>>>>>>>>>>>> shouldn't > >>>>>>>>>>>>>>>>>>>>>>>> be > >>>>>>>>>>>>>>>>>>>>>>>>> a problem > >>>>>>>>>>>>>>>>>>>>>>>>> for the upgrade story since the plan is still > >>>>>> configured > >>>>>>>>> as > >>>>>>>>>>>>>>>>>>>>>>>>> computed > >>>>>>>>>>>>>>>>>>>>>>>> from > >>>>>>>>>>>>>>>>>>>>>>>>> the previous flink > >>>>>>>>>>>>>>>>>>>>>>>>> version. What worries me is breaking changes, in > >>>>>>>>> particular > >>>>>>>>>>>>>>>>>>>>>>>>> behavioural > >>>>>>>>>>>>>>>>>>>>>>>>> changes that > >>>>>>>>>>>>>>>>>>>>>>>>> might happen in connectors/formats. Although this > >>>>>> argument > >>>>>>>>>>>>>>>>>>>>>>>>> doesn't seem > >>>>>>>>>>>>>>>>>>>>>>>>> relevant for > >>>>>>>>>>>>>>>>>>>>>>>>> the connectors shipped by the flink project itself, > >>>>>>>>> because > >>>>>>>>>>>> we > >>>>>>>>>>>>>>>>>>>>>>>>> try to > >>>>>>>>>>>>>>>>>>>>>>>> keep > >>>>>>>>>>>>>>>>>>>>>>>>> them as stable as > >>>>>>>>>>>>>>>>>>>>>>>>> possible and avoid eventual breaking changes, it's > >>>>>>>>>>>> compelling to > >>>>>>>>>>>>>>>>>>>>>>>> external > >>>>>>>>>>>>>>>>>>>>>>>>> connectors and > >>>>>>>>>>>>>>>>>>>>>>>>> formats, which might be decoupled from the flink > >>>>>> release > >>>>>>>>>>>> cycle > >>>>>>>>>>>>>>>>>>>>>>>>> and might > >>>>>>>>>>>>>>>>>>>>>>>>> have different > >>>>>>>>>>>>>>>>>>>>>>>>> backward compatibility guarantees. It's totally > >>>>>> reasonable > >>>>>>>>>>>> if we > >>>>>>>>>>>>>>>>>>>>>>>>> don't > >>>>>>>>>>>>>>>>>>>>>>>>> want to tackle it in > >>>>>>>>>>>>>>>>>>>>>>>>> this first iteration of the feature, but it's > >>>>>> something we > >>>>>>>>>>>> need > >>>>>>>>>>>>>>>>>>>>>>>>> to keep > >>>>>>>>>>>>>>>>>>>>>>>> in > >>>>>>>>>>>>>>>>>>>>>>>>> mind for the future. > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> *Functions:* It's not clear to me what you mean for > >>>>>>>>>>>>>>>>>>>>>>>>> "identifier", > >>>>>>>>>>>>>>>>>>>>>>>> because > >>>>>>>>>>>>>>>>>>>>>>>>> then somewhere > >>>>>>>>>>>>>>>>>>>>>>>>> else in the same context you talk about "name". Are > >>>> we > >>>>>>>>>>>> talking > >>>>>>>>>>>>>>>>>>>>>>>>> about the > >>>>>>>>>>>>>>>>>>>>>>>>> function name > >>>>>>>>>>>>>>>>>>>>>>>>> or the function complete signature? Let's assume > >> for > >>>>>>>>>>>> example we > >>>>>>>>>>>>>>>>>>>>>>>>> have > >>>>>>>>>>>>>>>>>>>>>>>> these > >>>>>>>>>>>>>>>>>>>>>>>>> function > >>>>>>>>>>>>>>>>>>>>>>>>> definitions: > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> * TO_TIMESTAMP_LTZ(BIGINT) > >>>>>>>>>>>>>>>>>>>>>>>>> * TO_TIMESTAMP_LTZ(STRING) > >>>>>>>>>>>>>>>>>>>>>>>>> * TO_TIMESTAMP_LTZ(STRING, STRING) > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> These for me are very different functions with > >>>>>> different > >>>>>>>>>>>>>>>>>>>>>>>> implementations, > >>>>>>>>>>>>>>>>>>>>>>>>> where each of > >>>>>>>>>>>>>>>>>>>>>>>>> them might evolve separately at a different pace. > >>>>>> Hence > >>>>>>>>>>>> when we > >>>>>>>>>>>>>>>>>>>>>>>>> store > >>>>>>>>>>>>>>>>>>>>>>>> them > >>>>>>>>>>>>>>>>>>>>>>>>> in the json > >>>>>>>>>>>>>>>>>>>>>>>>> plan we should perhaps use a logically defined > >> unique > >>>>>> id > >>>>>>>>>>>> like > >>>>>>>>>>>>>>>>>>>>>>>>> /bigIntToTimestamp/, / > >>>>>>>>>>>>>>>>>>>>>>>>> stringToTimestamp/ and > >> /stringToTimestampWithFormat/. > >>>>>> This > >>>>>>>>>>>> also > >>>>>>>>>>>>>>>>>>>>>>>>> solves > >>>>>>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>>> issue of > >>>>>>>>>>>>>>>>>>>>>>>>> correctly referencing the functions when restoring > >>>> the > >>>>>>>>> plan, > >>>>>>>>>>>>>>>>>>>>>>>>> without > >>>>>>>>>>>>>>>>>>>>>>>>> running again the > >>>>>>>>>>>>>>>>>>>>>>>>> inference logic (which might have been changed in > >> the > >>>>>>>>>>>> meantime) > >>>>>>>>>>>>>>>>>>>>>>>>> and it > >>>>>>>>>>>>>>>>>>>>>>>>> might also solve > >>>>>>>>>>>>>>>>>>>>>>>>> the versioning, that is the function identifier can > >>>>>>>>> contain > >>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>>> function > >>>>>>>>>>>>>>>>>>>>>>>>> version like / > >>>>>>>>>>>>>>>>>>>>>>>>> stringToTimestampWithFormat_1_1 /or > >>>>>>>>>>>>>>>>>>>>>>>>> /stringToTimestampWithFormat_1_2/. > >>>>>>>>>>>>>>>>>>>>>>>> An > >>>>>>>>>>>>>>>>>>>>>>>>> alternative could be to use the string signature > >>>>>>>>>>>> representation, > >>>>>>>>>>>>>>>>>>>>>>>>> which > >>>>>>>>>>>>>>>>>>>>>>>>> might not be trivial > >>>>>>>>>>>>>>>>>>>>>>>>> to compute, given the complexity of our type > >>>> inference > >>>>>>>>>>>> logic. > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> *The term "JSON plan"*: I think we should rather > >> keep > >>>>>> JSON > >>>>>>>>>>>> out > >>>>>>>>>>>>>>>>>>>>>>>>> of the > >>>>>>>>>>>>>>>>>>>>>>>>> concept and just > >>>>>>>>>>>>>>>>>>>>>>>>> name it "Compiled Plan" (like the proposed API) or > >>>>>>>>> something > >>>>>>>>>>>>>>>>>>>>>>>>> similar, > >>>>>>>>>>>>>>>>>>>>>>>> as I > >>>>>>>>>>>>>>>>>>>>>>>>> see how in > >>>>>>>>>>>>>>>>>>>>>>>>> future we might decide to support/modify our > >>>>>> persistence > >>>>>>>>>>>>>>>>>>>>>>>>> format to > >>>>>>>>>>>>>>>>>>>>>>>>> something more > >>>>>>>>>>>>>>>>>>>>>>>>> efficient storage wise like BSON. For example, I > >>>> would > >>>>>>>>>>>> rename / > >>>>>>>>>>>>>>>>>>>>>>>>> CompiledPlan.fromJsonFile/ to simply > >>>>>>>>>>>> /CompiledPlan.fromFile/. > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> *Who is the owner of the plan file?* I asked myself > >>>>>> this > >>>>>>>>>>>>>>>>>>>>>>>>> question when > >>>>>>>>>>>>>>>>>>>>>>>>> reading this: > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> For simplification of the design, we assume that > >>>>>> upgrades > >>>>>>>>>>>> use a > >>>>>>>>>>>>>>>>>>>>>>>>>> step > >>>>>>>>>>>>>>>>>>>>>>>>> size of a single > >>>>>>>>>>>>>>>>>>>>>>>>> minor version. We don't guarantee skipping minor > >>>>>> versions > >>>>>>>>>>>> (e.g. > >>>>>>>>>>>>>>>>>>>>>>>>> 1.11 to > >>>>>>>>>>>>>>>>>>>>>>>>> 1.14). > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> My understanding of this statement is that a user > >> can > >>>>>>>>>>>> upgrade > >>>>>>>>>>>>>>>>>>>>>>>>> between > >>>>>>>>>>>>>>>>>>>>>>>>> minors but then > >>>>>>>>>>>>>>>>>>>>>>>>> following all the minors, the same query can remain > >>>>>> up and > >>>>>>>>>>>>>>>>> running. > >>>>>>>>>>>>>>>>>>>>>>>> E.g. I > >>>>>>>>>>>>>>>>>>>>>>>>> upgrade from > >>>>>>>>>>>>>>>>>>>>>>>>> 1.15 to 1.16, and then from 1.16 to 1.17 and I > >> still > >>>>>>>>> expect > >>>>>>>>>>>> my > >>>>>>>>>>>>>>>>>>>>>>>>> original > >>>>>>>>>>>>>>>>>>>>>>>>> query to work > >>>>>>>>>>>>>>>>>>>>>>>>> without recomputing the plan. This necessarily > >> means > >>>>>> that > >>>>>>>>> at > >>>>>>>>>>>>>>>>>>>>>>>>> some point > >>>>>>>>>>>>>>>>>>>>>>>> in > >>>>>>>>>>>>>>>>>>>>>>>>> future > >>>>>>>>>>>>>>>>>>>>>>>>> releases we'll need some basic "migration" tool to > >>>>>> keep > >>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>>> queries up > >>>>>>>>>>>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>>>>>>>>> running, > >>>>>>>>>>>>>>>>>>>>>>>>> ending up modifying the compiled plan. So I guess > >>>>>> flink > >>>>>>>>>>>> should > >>>>>>>>>>>>>>>>>>>>>>>>> write it > >>>>>>>>>>>>>>>>>>>>>>>>> back in the original > >>>>>>>>>>>>>>>>>>>>>>>>> plan file, perhaps doing a backup of the previous > >>>>>> one? Can > >>>>>>>>>>>> you > >>>>>>>>>>>>>>>>>>>>>>>>> please > >>>>>>>>>>>>>>>>>>>>>>>>> clarify this aspect? > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> Except these considerations, the proposal looks > >> good > >>>>>> to me > >>>>>>>>>>>>>>>>>>>>>>>>> and I'm > >>>>>>>>>>>>>>>>>>>>>>>> eagerly > >>>>>>>>>>>>>>>>>>>>>>>>> waiting to see > >>>>>>>>>>>>>>>>>>>>>>>>> it in play. > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>>>>>>>>>>>>>> FG > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> -- > >>>>>>>>>>>>>>>>>>>>>>>>> Francesco Guardiani | Software Engineer > >>>>>>>>>>>>>>>>>>>>>>>>> france...@ververica.com[1] > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> Follow us @VervericaData > >>>>>>>>>>>>>>>>>>>>>>>>> -- > >>>>>>>>>>>>>>>>>>>>>>>>> Join Flink Forward[2] - The Apache Flink Conference > >>>>>>>>>>>>>>>>>>>>>>>>> Stream Processing | Event Driven | Real Time > >>>>>>>>>>>>>>>>>>>>>>>>> -- > >>>>>>>>>>>>>>>>>>>>>>>>> Ververica GmbH | Invalidenstrasse 115, 10115 > >> Berlin, > >>>>>>>>> Germany > >>>>>>>>>>>>>>>>>>>>>>>>> -- > >>>>>>>>>>>>>>>>>>>>>>>>> Ververica GmbH > >>>>>>>>>>>>>>>>>>>>>>>>> Registered at Amtsgericht Charlottenburg: HRB > >> 158244 > >>>> B > >>>>>>>>>>>>>>>>>>>>>>>>> Managing Directors: Karl Anton Wehner, Holger > >> Temme, > >>>>>> Yip > >>>>>>>>>>>> Park > >>>>>>>>>>>>>>>>>>>>>>>>> Tung > >>>>>>>>>>>>>>>>>>>>>>>> Jason, > >>>>>>>>>>>>>>>>>>>>>>>>> Jinwei (Kevin) > >>>>>>>>>>>>>>>>>>>>>>>>> Zhang > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> -------- > >>>>>>>>>>>>>>>>>>>>>>>>> [1] mailto:france...@ververica.com > >>>>>>>>>>>>>>>>>>>>>>>>> [2] https://flink-forward.org/ > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> -- > >>>>>>>>>>>>>>>>>>>>>>> Marios > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>>>> > >>>>> > >>>> > >>>> > >>> > >> > >> > > >