Hi, Timo,  +1 for multi metadata.

The compatible change I mean in the last email is the slight state change
example you gave, so we have got  consensus on this actually, IMO.

Another question based on the example you gave:
In the example "JSON node gets an additional property in 1.16", if we don't
update the version when plan format changes, we can't find that the plan
can't not be deserialized in 1.15, although the savepoint state is
compatible.
The error message may be not so friendly if we just throw deserialization
failure.

On Tue, 7 Dec 2021 at 16:49, Timo Walther <twal...@apache.org> wrote:

> Hi Wenlong,
>
>  > First,  we add a newStateLayout because of some improvement in state, in
>  > order to keep compatibility we may still keep the old state for the
> first
>  > version. We need to update the version, so that we can generate a new
>  > version plan for the new job and keep the exec node compatible with
> the old
>  > version plan.
>
> The problem that I see here for contributors is that the actual update
> of a version is more complicated than just updating an integer value. It
> means copying a lot of ExecNode code for a change that happens locally
> in an operator. Let's assume multiple ExecNodes use a similar operator.
> Why do we need to update all ExecNode versions, if the operator itself
> can deal with the incompatibility. The ExecNode version is meant for
> topology changes or fundamental state changes.
>
> If we don't find consensus on this topic, I would at least vote for
> supporting multiple annotations for an ExecNode class. This way we don't
> need to copy code but only add two ExecNode annotations with different
> ExecNode versions.
>
>  > Maybe we can add support for this case :
>  > when an exec node is changed in 1.16, but is compatible with 1.15,
>  > we can use the node of 1.16 to deserialize the plan of 1.15.
>
> If the ExecNode is compatible, there is no reason to increase the
> ExecNode version.
>
>
>
> I tried to come up with a reworked solution to make all parties happy:
>
> 1. Let's assume the following annotations:
>
> supportedPlanFormat = [1.15]
>
> supportedSavepointFormat = 1.15
>
> we drop `added` as it is equal to `supportedSavepointFormat`
>
> 2. Multiple annotations over ExecNodes are possible:
>
> // operator state changes
>
> // initial introduction in 1.15
> @ExecNodeMetadata(name=A, version=1, supportedPlanFormat=1.15,
> supportedSavepointFormat=1.15)
>
> // state layout changed slightly in 1.16
> // - operator migration is possible
> // - operator supports state of both versions and will perform operator
> state migration
> // - new plans will get new ExecNode version
> @ExecNodeMetadata(name=A, version=1, supportedPlanFormat=1.15,
> supportedSavepointFormat=1.15)
> @ExecNodeMetadata(name=A, version=2, supportedPlanFormat=1.15,
> supportedSavepointFormat=1.16)
>
> // we force a plan migration in 1.17
> // - we assume that all operator states have been migrated in the
> previous version
> // - we can safely replace the old version `1` with `2` and only keep
> the new savepoint format
> @ExecNodeMetadata(name=A, version=2, supportedPlanFormat=1.15,
> supportedSavepointFormat=1.16)
>
>
> // plan changes
>
> // initial introduction in 1.15
> @ExecNodeMetadata(name=A, version=1, supportedPlanFormat=1.15,
> supportedSavepointFormat=1.15)
>
> // JSON node gets an additional property in 1.16
> // e.g. { some-prop: 42 } -> { some-prop: 42, some-flag: false}
> // - ExecNode version does not change
> // - ExecNode version only changes when topology or state is affected
> // - we support both JSON plan formats, the old and the newest one
> @ExecNodeMetadata(name=A, version=1, supportedPlanFormat=[1.15, 1.16],
> supportedSavepointFormat=1.15)
>
> // we force a plan migration in 1.17
> // - now we only support 1.16 plan format
> @ExecNodeMetadata(name=A, version=1, supportedPlanFormat=1.16,
> supportedSavepointFormat=1.15)
>
>
> // topology change
>
> // initial introduction in 1.15
> @ExecNodeMetadata(name=A, version=1, supportedPlanFormat=1.15,
> supportedSavepointFormat=1.15)
>
> // complete new class structure in 1.16 annotated with
> @ExecNodeMetadata(name=A, version=2, supportedPlanFormat=1.15,
> supportedSavepointFormat=1.16)
>
>
>
> What do you think?
>
>
> Regards,
> Timo
>
>
>
>
>
>
>
>
>
> On 07.12.21 08:20, wenlong.lwl wrote:
> > Maybe we can add support for this case :
> >          when an exec node is changed in 1.16, but is compatible with
> 1.15,
> > we can use the node of 1.16 to deserialize the plan of 1.15.
> > By this way, we don't need to fork the code if the change is compatible,
> > and can avoid fork code frequently.
> >
> >
> > Best,
> > Wenlong
> >
> >
> > On Tue, 7 Dec 2021 at 15:08, wenlong.lwl <wenlong88....@gmail.com>
> wrote:
> >
> >> hi, Timo, I would prefer to update the version every time we change the
> >> state layer too.
> >>
> >> It could be possible that we change the exec node in 2 steps:
> >> First,  we add a newStateLayout because of some improvement in state, in
> >> order to keep compatibility we may still keep the old state for the
> first
> >> version. We need to update the version, so that we can generate a new
> >> version plan for the new job and keep the exec node compatible with the
> old
> >> version plan.
> >> After some versions, we may remove the old version state layout and
> clean
> >> up the deprecated code. We still need to update the version, so that we
> can
> >> verify that we are compatible with the plan after the first change, but
> not
> >> compatible with the plan earlier.
> >>
> >>
> >> Best,
> >> Wenlong
> >>
> >> On Mon, 6 Dec 2021 at 21:27, Timo Walther <twal...@apache.org> wrote:
> >>
> >>> Hi Godfrey,
> >>>
> >>>   > design makes thing more complex.
> >>>
> >>> Yes, the design might be a bit more complex. But operator migration is
> >>> way easier than ExecNode migration at a later point in time for code
> >>> maintenance. We know that ExecNodes can become pretty complex. Even
> >>> though we have put a lot of code into `CommonXXExecNode` it will be a
> >>> lot of work to maintain multiple versions of ExecNodes. If we can avoid
> >>> this with operator state migration, this should always be preferred
> over
> >>> a new ExecNode version.
> >>>
> >>> I'm aware that operator state migration might only be important for
> >>> roughly 10 % of all changes. A new ExecNode version will be used for
> 90%
> >>> of all changes.
> >>>
> >>>   > If there are multiple state layouts, which layout the ExecNode
> should
> >>> use?
> >>>
> >>> It is not the responsibility of the ExecNode to decide this but the
> >>> operator. Something like:
> >>>
> >>> class X extends ProcessFunction {
> >>>     ValueState<A> oldStateLayout;
> >>>     ValueState<B> newStateLayout;
> >>>
> >>>     open() {
> >>>       if (oldStateLayout.get() != null) {
> >>>         performOperatorMigration();
> >>>       }
> >>>       useNewStateLayout();
> >>>     }
> >>> }
> >>>
> >>> Operator migration is meant for smaller "more local" changes without
> >>> touching the ExecNode layer. The CEP library and DataStream API sources
> >>> are performing operator migration for years already.
> >>>
> >>>
> >>>   > `supportedPlanChanges ` and `supportedSavepointChanges ` are a bit
> >>> obscure.
> >>>
> >>> Let me try to come up with more examples why I think both annotation
> >>> make sense and are esp. important *for test coverage*.
> >>>
> >>> supportedPlanChanges:
> >>>
> >>> Let's assume we have some JSON in Flink 1.15:
> >>>
> >>> {
> >>>     some-prop: 42
> >>> }
> >>>
> >>> And we want to extend the JSON in Flink 1.16:
> >>>
> >>> {
> >>>     some-prop: 42,
> >>>     some-flag: false
> >>> }
> >>>
> >>> Maybe we don't need to increase the ExecNode version but only ensure
> >>> that the flag is set to `false` by default for the older versions.
> >>>
> >>> We need a location to track changes and document the changelog. With
> the
> >>> help of the annotation supportedPlanChanges = [1.15, 1.16] we can
> verify
> >>> that we have tests for both JSON formats.
> >>>
> >>> And once we decide to drop the 1.15 format, we enforce plan migration
> >>> and fill-in the default value `false` into the old plans and bump their
> >>> JSON plan version to 1.16 or higher.
> >>>
> >>>
> >>>
> >>>   > once the state layout is changed, the ExecNode version needs also
> be
> >>> updated
> >>>
> >>> This will still be the majority of cases. But if we can avoid this, we
> >>> should do it for not having too much duplicate code to maintain.
> >>>
> >>>
> >>>
> >>> Thanks,
> >>> Timo
> >>>
> >>>
> >>> On 06.12.21 09:58, godfrey he wrote:
> >>>> Hi, Timo,
> >>>>
> >>>> Thanks for the detailed explanation.
> >>>>
> >>>>> We change an operator state of B in Flink 1.16. We perform the change
> >>> in the operator of B in a way to support both state layouts. Thus, no
> need
> >>> for a new ExecNode version.
> >>>>
> >>>> I think this design makes thing more complex.
> >>>> 1. If there are multiple state layouts, which layout the ExecNode
> >>> should use ?
> >>>> It increases the cost of understanding for developers (especially for
> >>>> Flink newer),
> >>>> making them prone to mistakes.
> >>>> 2. `supportedPlanChanges ` and `supportedSavepointChanges ` are a bit
> >>> obscure.
> >>>>
> >>>> The purpose of ExecNode annotations are not only to support powerful
> >>> validation,
> >>>> but more importantly to make it easy for developers to understand
> >>>> to ensure that every modification is easy and state compatible.
> >>>>
> >>>> I prefer, once the state layout is changed, the ExecNode version needs
> >>>> also be updated.
> >>>> which could make thing simple. How about
> >>>> rename `supportedPlanChanges ` to `planCompatibleVersion`
> >>>> (which means the plan is compatible with the plan generated by the
> >>>> given version node)
> >>>>    and rename `supportedSavepointChanges` to
> `savepointCompatibleVersion
> >>> `
> >>>> (which means the state is compatible with the state generated by the
> >>>> given version node) ?
> >>>> The names also indicate that only one version value can be set.
> >>>>
> >>>> WDYT?
> >>>>
> >>>> Best,
> >>>> Godfrey
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>> Timo Walther <twal...@apache.org> 于2021年12月2日周四 下午11:42写道:
> >>>>>
> >>>>> Response to Marios's feedback:
> >>>>>
> >>>>>    > there should be some good logging in place when the upgrade is
> >>> taking
> >>>>> place
> >>>>>
> >>>>> Yes, I agree. I added this part to the FLIP.
> >>>>>
> >>>>>    > config option instead that doesn't provide the flexibility to
> >>>>> overwrite certain plans
> >>>>>
> >>>>> One can set the config option also around sections of the
> >>>>> multi-statement SQL script.
> >>>>>
> >>>>> SET 'table.plan.force-recompile'='true';
> >>>>>
> >>>>> COMPILE ...
> >>>>>
> >>>>> SET 'table.plan.force-recompile'='false';
> >>>>>
> >>>>> But the question is why a user wants to run COMPILE multiple times.
> If
> >>>>> it is during development, then running EXECUTE (or just the statement
> >>>>> itself) without calling COMPILE should be sufficient. The file can
> also
> >>>>> manually be deleted if necessary.
> >>>>>
> >>>>> What do you think?
> >>>>>
> >>>>> Regards,
> >>>>> Timo
> >>>>>
> >>>>>
> >>>>>
> >>>>> On 02.12.21 16:09, Timo Walther wrote:
> >>>>>> Hi Till,
> >>>>>>
> >>>>>> Yes, you might have to. But not a new plan from the SQL query but a
> >>>>>> migration from the old plan to the new plan. This will not happen
> >>> often.
> >>>>>> But we need a way to evolve the format of the JSON plan itself.
> >>>>>>
> >>>>>> Maybe this confuses a bit, so let me clarify it again: Mostly
> ExecNode
> >>>>>> versions and operator state layouts will evolve. Not the plan files,
> >>>>>> those will be pretty stable. But also not infinitely.
> >>>>>>
> >>>>>> Regards,
> >>>>>> Timo
> >>>>>>
> >>>>>>
> >>>>>> On 02.12.21 16:01, Till Rohrmann wrote:
> >>>>>>> Then for migrating from Flink 1.10 to 1.12, I might have to create
> a
> >>> new
> >>>>>>> plan using Flink 1.11 in order to migrate from Flink 1.11 to 1.12,
> >>> right?
> >>>>>>>
> >>>>>>> Cheers,
> >>>>>>> Till
> >>>>>>>
> >>>>>>> On Thu, Dec 2, 2021 at 3:39 PM Timo Walther <twal...@apache.org>
> >>> wrote:
> >>>>>>>
> >>>>>>>> Response to Till's feedback:
> >>>>>>>>
> >>>>>>>>     > compiled plan won't be changed after being written initially
> >>>>>>>>
> >>>>>>>> This is not entirely correct. We give guarantees for keeping the
> >>> query
> >>>>>>>> up and running. We reserve us the right to force plan migrations.
> In
> >>>>>>>> this case, the plan might not be created from the SQL statement
> but
> >>> from
> >>>>>>>> the old plan. I have added an example in section 10.1.1. In
> general,
> >>>>>>>> both persisted entities "plan" and "savepoint" can evolve
> >>> independently
> >>>>>>>> from each other.
> >>>>>>>>
> >>>>>>>> Thanks,
> >>>>>>>> Timo
> >>>>>>>>
> >>>>>>>> On 02.12.21 15:10, Timo Walther wrote:
> >>>>>>>>> Response to Godfrey's feedback:
> >>>>>>>>>
> >>>>>>>>>     > "EXPLAIN PLAN EXECUTE STATEMENT SET BEGIN ... END" is
> missing.
> >>>>>>>>>
> >>>>>>>>> Thanks for the hint. I added a dedicated section 7.1.3.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>     > it's hard to maintain the supported versions for
> >>>>>>>>> "supportedPlanChanges" and "supportedSavepointChanges"
> >>>>>>>>>
> >>>>>>>>> Actually, I think we are mostly on the same page.
> >>>>>>>>>
> >>>>>>>>> The annotation does not need to be updated for every Flink
> >>> version. As
> >>>>>>>>> the name suggests it is about "Changes" (in other words:
> >>>>>>>>> incompatibilities) that require some kind of migration. Either
> plan
> >>>>>>>>> migration (= PlanChanges) or savepoint migration
> >>> (=SavepointChanges,
> >>>>>>>>> using operator migration or savepoint migration).
> >>>>>>>>>
> >>>>>>>>> Let's assume we introduced two ExecNodes A and B in Flink 1.15.
> >>>>>>>>>
> >>>>>>>>> The annotations are:
> >>>>>>>>>
> >>>>>>>>> @ExecNodeMetadata(name=A, supportedPlanChanges=1.15,
> >>>>>>>>> supportedSavepointChanges=1.15)
> >>>>>>>>>
> >>>>>>>>> @ExecNodeMetadata(name=B, supportedPlanChanges=1.15,
> >>>>>>>>> supportedSavepointChanges=1.15)
> >>>>>>>>>
> >>>>>>>>> We change an operator state of B in Flink 1.16.
> >>>>>>>>>
> >>>>>>>>> We perform the change in the operator of B in a way to support
> both
> >>>>>>>>> state layouts. Thus, no need for a new ExecNode version.
> >>>>>>>>>
> >>>>>>>>> The annotations in 1.16 are:
> >>>>>>>>>
> >>>>>>>>> @ExecNodeMetadata(name=A, supportedPlanChanges=1.15,
> >>>>>>>>> supportedSavepointChanges=1.15)
> >>>>>>>>>
> >>>>>>>>> @ExecNodeMetadata(name=B, supportedPlanChanges=1.15,
> >>>>>>>>> supportedSavepointChanges=1.15, 1.16)
> >>>>>>>>>
> >>>>>>>>> So the versions in the annotations are "start version"s.
> >>>>>>>>>
> >>>>>>>>> I don't think we need end versions? End version would mean that
> we
> >>> drop
> >>>>>>>>> the ExecNode from the code base?
> >>>>>>>>>
> >>>>>>>>> Please check the section 10.1.1 again. I added a more complex
> >>> example.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> Thanks,
> >>>>>>>>> Timo
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On 01.12.21 16:29, Timo Walther wrote:
> >>>>>>>>>> Response to Francesco's feedback:
> >>>>>>>>>>
> >>>>>>>>>>     > *Proposed changes #6*: Other than defining this rule of
> >>> thumb, we
> >>>>>>>>>> must also make sure that compiling plans with these objects that
> >>>>>>>>>> cannot be serialized in the plan must fail hard
> >>>>>>>>>>
> >>>>>>>>>> Yes, I totally agree. We will fail hard with a helpful
> exception.
> >>> Any
> >>>>>>>>>> mistake e.g. using a inline object in Table API or an invalid
> >>>>>>>>>> DataStream API source without uid should immediately fail a plan
> >>>>>>>>>> compilation step. I added a remark to the FLIP again.
> >>>>>>>>>>
> >>>>>>>>>>     > What worries me is breaking changes, in particular
> >>> behavioural
> >>>>>>>>>> changes that might happen in connectors/formats
> >>>>>>>>>>
> >>>>>>>>>> Breaking changes in connectors and formats need to be encoded in
> >>> the
> >>>>>>>>>> options. I could also imagine to versioning in the factory
> >>> identifier
> >>>>>>>>>> `connector=kafka` and `connector=kafka-2`. If this is necessary.
> >>>>>>>>>>
> >>>>>>>>>> After thinking about your question again, I think we will also
> >>> need
> >>>>>>>>>> the same testing infrastructure for our connectors and formats.
> >>> Esp.
> >>>>>>>>>> restore tests and completeness test. I updated the document
> >>>>>>>>>> accordingly. Also I added a way to generate UIDs for DataStream
> >>> API
> >>>>>>>>>> providers.
> >>>>>>>>>>
> >>>>>>>>>>     > *Functions:* Are we talking about the function name or the
> >>>>>>>>>> function
> >>>>>>>>>> complete signature?
> >>>>>>>>>>
> >>>>>>>>>> For catalog functions, the identifier contains catalog name and
> >>>>>>>>>> database name. For system functions, identifier contains only a
> >>> name
> >>>>>>>>>> which make function name and identifier identical. I reworked
> the
> >>>>>>>>>> section again and also fixed some of the naming conflicts you
> >>>>>>>>>> mentioned.
> >>>>>>>>>>
> >>>>>>>>>>     > we should perhaps use a logically defined unique id like
> >>>>>>>>>> /bigIntToTimestamp/
> >>>>>>>>>>
> >>>>>>>>>> I added a concrete example for the resolution and restoration.
> The
> >>>>>>>>>> unique id is composed of name + version. Internally, this is
> >>>>>>>>>> represented as `$TO_TIMESTAMP_LTZ$1`.
> >>>>>>>>>>
> >>>>>>>>>>     > I think we should rather keep JSON out of the concept
> >>>>>>>>>>
> >>>>>>>>>> Sounds ok to me. In SQL we also just call it "plan". I will
> >>> change the
> >>>>>>>>>> file sections. But would suggest to keep the fromJsonString
> >>> method.
> >>>>>>>>>>
> >>>>>>>>>>     > write it back in the original plan file
> >>>>>>>>>>
> >>>>>>>>>> I updated the terminology section for what we consider an
> >>> "upgrade".
> >>>>>>>>>> We might need to update the orginal plan file. This is already
> >>>>>>>>>> considered in the COMPILE PLAN ... FROM ... even though this is
> >>> future
> >>>>>>>>>> work. Also savepoint migration.
> >>>>>>>>>>
> >>>>>>>>>> Thanks for all the feedback!
> >>>>>>>>>>
> >>>>>>>>>> Timo
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> On 30.11.21 14:28, Timo Walther wrote:
> >>>>>>>>>>> Response to Wenlongs's feedback:
> >>>>>>>>>>>
> >>>>>>>>>>>     > I would prefer not to provide such a shortcut, let users
> use
> >>>>>>>>>>> COMPILE PLAN IF NOT EXISTS and EXECUTE explicitly, which can be
> >>>>>>>>>>> understood by new users even without inferring the docs.
> >>>>>>>>>>>
> >>>>>>>>>>> I would like to hear more opinions on this topic. Personally, I
> >>> find
> >>>>>>>>>>> a combined statement very useful. Not only for quicker
> >>> development
> >>>>>>>>>>> and debugging but also for readability. It helps in keeping the
> >>> JSON
> >>>>>>>>>>> path and the query close to each other in order to know the
> >>> origin of
> >>>>>>>>>>> the plan.
> >>>>>>>>>>>
> >>>>>>>>>>>     > but the plan and SQL are not matched. The result would be
> >>> quite
> >>>>>>>>>>> confusing if we still execute the plan directly, we may need to
> >>> add a
> >>>>>>>>>>> validation.
> >>>>>>>>>>>
> >>>>>>>>>>> You are right that there could be a mismatch. But we have a
> >>> similar
> >>>>>>>>>>> problem when executing CREATE TABLE IF NOT EXISTS. The schema
> or
> >>>>>>>>>>> options of a table could have changed completely in the catalog
> >>> but
> >>>>>>>>>>> the CREATE TABLE IF NOT EXISTS is not executed again. So a
> >>> mismatch
> >>>>>>>>>>> could also occur there.
> >>>>>>>>>>>
> >>>>>>>>>>> Regards,
> >>>>>>>>>>> Timo
> >>>>>>>>>>>
> >>>>>>>>>>> On 30.11.21 14:17, Timo Walther wrote:
> >>>>>>>>>>>> Hi everyone,
> >>>>>>>>>>>>
> >>>>>>>>>>>> thanks for the feedback so far. Let me answer each email
> >>>>>>>>>>>> indvidually.
> >>>>>>>>>>>>
> >>>>>>>>>>>> I will start with a response to Ingo's feedback:
> >>>>>>>>>>>>
> >>>>>>>>>>>>     > Will the JSON plan's schema be considered an API?
> >>>>>>>>>>>>
> >>>>>>>>>>>> No, not in the first version. This is explicitly mentioned in
> >>> the
> >>>>>>>>>>>> `General JSON Plan Assumptions`. I tried to improve the
> section
> >>> once
> >>>>>>>>>>>> more to make it clearer. However, the JSON plan is definitely
> >>> stable
> >>>>>>>>>>>> per minor version. And since the plan is versioned by Flink
> >>> version,
> >>>>>>>>>>>> external tooling could be build around it. We might make it
> >>> public
> >>>>>>>>>>>> API once the design has settled.
> >>>>>>>>>>>>
> >>>>>>>>>>>>     > Given that upgrades across multiple versions at once are
> >>>>>>>>>>>> unsupported, do we verify this somehow?
> >>>>>>>>>>>>
> >>>>>>>>>>>> Good question. I extended the `General JSON Plan Assumptions`.
> >>> Now
> >>>>>>>>>>>> yes: the Flink version is part of the JSON plan and will be
> >>> verified
> >>>>>>>>>>>> during restore. But keep in mind that we might support more
> that
> >>>>>>>>>>>> just the last version at least until the JSON plan has been
> >>>>>>>>>>>> migrated.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Regards,
> >>>>>>>>>>>> Timo
> >>>>>>>>>>>>
> >>>>>>>>>>>> On 30.11.21 09:39, Marios Trivyzas wrote:
> >>>>>>>>>>>>> I have a question regarding the `COMPILE PLAN OVEWRITE`. If
> we
> >>>>>>>>>>>>> choose to go
> >>>>>>>>>>>>> with the config option instead,
> >>>>>>>>>>>>> that doesn't provide the flexibility to overwrite certain
> >>> plans but
> >>>>>>>>>>>>> not
> >>>>>>>>>>>>> others, since the config applies globally, isn't that
> >>>>>>>>>>>>> something to consider?
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On Mon, Nov 29, 2021 at 10:15 AM Marios Trivyzas <
> >>> mat...@gmail.com>
> >>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> Hi Timo!
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Thanks a lot for taking all that time and effort to put
> >>> together
> >>>>>>>> this
> >>>>>>>>>>>>>> proposal!
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Regarding:
> >>>>>>>>>>>>>>> For simplification of the design, we assume that upgrades
> >>> use a
> >>>>>>>>>>>>>>> step size
> >>>>>>>>>>>>>> of a single
> >>>>>>>>>>>>>> minor version. We don't guarantee skipping minor versions
> >>> (e.g.
> >>>>>>>>>>>>>> 1.11 to
> >>>>>>>>>>>>>> 1.14).
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> I think that for this first step we should make it
> absolutely
> >>>>>>>>>>>>>> clear to the
> >>>>>>>>>>>>>> users that they would need to go through all intermediate
> >>> versions
> >>>>>>>>>>>>>> to end up with the target version they wish. If we are to
> >>> support
> >>>>>>>>>>>>>> skipping
> >>>>>>>>>>>>>> versions in the future, i.e. upgrade from 1.14 to 1.17, this
> >>> means
> >>>>>>>>>>>>>> that we need to have a testing infrastructure in place that
> >>> would
> >>>>>>>>>>>>>> test all
> >>>>>>>>>>>>>> possible combinations of version upgrades, i.e. from 1.14 to
> >>> 1.15,
> >>>>>>>>>>>>>> from 1.14 to 1.16 and so forth, while still testing and of
> >>> course
> >>>>>>>>>>>>>> supporting all the upgrades from the previous minor version.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> I like a lot the idea of introducing HINTS to define some
> >>>>>>>>>>>>>> behaviour in the
> >>>>>>>>>>>>>> programs!
> >>>>>>>>>>>>>> - the hints live together with the sql statements and
> >>> consequently
> >>>>>>>>>>>>>> the
> >>>>>>>>>>>>>> (JSON) plans.
> >>>>>>>>>>>>>> - If multiple queries are involved in a program, each one of
> >>> them
> >>>>>>>> can
> >>>>>>>>>>>>>> define its own config (regarding plan optimisation, not null
> >>>>>>>>>>>>>> enforcement,
> >>>>>>>>>>>>>> etc)
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> I agree with Francesco on his argument regarding the *JSON*
> >>>>>>>>>>>>>> plan. I
> >>>>>>>>>>>>>> believe we should already provide flexibility here, since
> (who
> >>>>>>>>>>>>>> knows) in
> >>>>>>>>>>>>>> the future
> >>>>>>>>>>>>>> a JSON plan might not fulfil the desired functionality.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> I also agree that we need some very obvious way (i.e. not
> log
> >>>>>>>>>>>>>> entry) to
> >>>>>>>>>>>>>> show the users that their program doesn't support version
> >>>>>>>>>>>>>> upgrades, and
> >>>>>>>>>>>>>> prevent them from being negatively surprised in the future,
> >>> when
> >>>>>>>>>>>>>> trying to
> >>>>>>>>>>>>>> upgrade their production pipelines.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> This is an implementation detail, but I'd like to add that
> >>> there
> >>>>>>>>>>>>>> should be
> >>>>>>>>>>>>>> some good logging in place when the upgrade is taking place,
> >>> to be
> >>>>>>>>>>>>>> able to track every restoration action, and help debug any
> >>>>>>>>>>>>>> potential
> >>>>>>>>>>>>>> issues arising from that.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> On Fri, Nov 26, 2021 at 2:54 PM Till Rohrmann
> >>>>>>>>>>>>>> <trohrm...@apache.org
> >>>>>>>>>
> >>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Thanks for writing this FLIP Timo. I think this will be a
> >>> very
> >>>>>>>>>>>>>>> important
> >>>>>>>>>>>>>>> improvement for Flink and our SQL user :-)
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Similar to Francesco I would like to understand the
> statement
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> For simplification of the design, we assume that upgrades
> >>> use a
> >>>>>>>>>>>>>>>> step
> >>>>>>>>>>>>>>> size
> >>>>>>>>>>>>>>> of a single
> >>>>>>>>>>>>>>> minor version. We don't guarantee skipping minor versions
> >>> (e.g.
> >>>>>>>>>>>>>>> 1.11 to
> >>>>>>>>>>>>>>> 1.14).
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> a bit better. Is it because Flink does not guarantee that a
> >>>>>>>>>>>>>>> savepoint
> >>>>>>>>>>>>>>> created by version 1.x can be directly recovered by version
> >>> 1.y
> >>>>>>>>>>>>>>> with x + 1
> >>>>>>>>>>>>>>> < y but users might have to go through a cascade of
> upgrades?
> >>>>>>>>>>>>>>>    From how I
> >>>>>>>>>>>>>>> understand your proposal, the compiled plan won't be
> changed
> >>>>>>>>>>>>>>> after being
> >>>>>>>>>>>>>>> written initially. Hence, I would assume that for the plan
> >>> alone
> >>>>>>>>>>>>>>> Flink
> >>>>>>>>>>>>>>> will
> >>>>>>>>>>>>>>> have to give backwards compatibility guarantees for all
> >>> versions.
> >>>>>>>>>>>>>>> Am I
> >>>>>>>>>>>>>>> understanding this part correctly?
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Cheers,
> >>>>>>>>>>>>>>> Till
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> On Thu, Nov 25, 2021 at 4:55 PM Francesco Guardiani <
> >>>>>>>>>>>>>>> france...@ververica.com>
> >>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Hi Timo,
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Thanks for putting this amazing work together, I have some
> >>>>>>>>>>>>>>>> considerations/questions
> >>>>>>>>>>>>>>>> about the FLIP:
> >>>>>>>>>>>>>>>> *Proposed changes #6*: Other than defining this rule of
> >>> thumb,
> >>>>>>>>>>>>>>>> we must
> >>>>>>>>>>>>>>>> also make sure
> >>>>>>>>>>>>>>>> that compiling plans with these objects that cannot be
> >>>>>>>>>>>>>>>> serialized in the
> >>>>>>>>>>>>>>>> plan must fail hard,
> >>>>>>>>>>>>>>>> so users don't bite themselves with such issues, or at
> >>> least we
> >>>>>>>>>>>>>>>> need to
> >>>>>>>>>>>>>>>> output warning
> >>>>>>>>>>>>>>>> logs. In general, whenever the user is trying to use the
> >>>>>>>>>>>>>>>> CompiledPlan
> >>>>>>>>>>>>>>> APIs
> >>>>>>>>>>>>>>>> and at the same
> >>>>>>>>>>>>>>>> time, they're trying to do something "illegal" for the
> >>> plan, we
> >>>>>>>>>>>>>>>> should
> >>>>>>>>>>>>>>>> immediately either
> >>>>>>>>>>>>>>>> log or fail depending on the issue, in order to avoid any
> >>>>>>>>>>>>>>>> surprises once
> >>>>>>>>>>>>>>>> the user upgrades.
> >>>>>>>>>>>>>>>> I would also say the same for things like registering a
> >>>>>>>>>>>>>>>> function,
> >>>>>>>>>>>>>>>> registering a DataStream,
> >>>>>>>>>>>>>>>> and for every other thing which won't end up in the plan,
> we
> >>>>>>>>>>>>>>>> should log
> >>>>>>>>>>>>>>>> such info to the
> >>>>>>>>>>>>>>>> user by default.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> *General JSON Plan Assumptions #9:* When thinking to
> >>> connectors
> >>>>>>>> and
> >>>>>>>>>>>>>>>> formats, I think
> >>>>>>>>>>>>>>>> it's reasonable to assume and keep out of the feature
> design
> >>>>>>>>>>>>>>>> that no
> >>>>>>>>>>>>>>>> feature/ability can
> >>>>>>>>>>>>>>>> deleted from a connector/format. I also don't think new
> >>>>>>>>>>>>>>> features/abilities
> >>>>>>>>>>>>>>>> can influence
> >>>>>>>>>>>>>>>> this FLIP as well, given the plan is static, so if for
> >>> example,
> >>>>>>>>>>>>>>>> MyCoolTableSink in the next
> >>>>>>>>>>>>>>>> flink version implements SupportsProjectionsPushDown, then
> >>> it
> >>>>>>>>>>>>>>>> shouldn't
> >>>>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>> a problem
> >>>>>>>>>>>>>>>> for the upgrade story since the plan is still configured
> as
> >>>>>>>>>>>>>>>> computed
> >>>>>>>>>>>>>>> from
> >>>>>>>>>>>>>>>> the previous flink
> >>>>>>>>>>>>>>>> version. What worries me is breaking changes, in
> particular
> >>>>>>>>>>>>>>>> behavioural
> >>>>>>>>>>>>>>>> changes that
> >>>>>>>>>>>>>>>> might happen in connectors/formats. Although this argument
> >>>>>>>>>>>>>>>> doesn't seem
> >>>>>>>>>>>>>>>> relevant for
> >>>>>>>>>>>>>>>> the connectors shipped by the flink project itself,
> because
> >>> we
> >>>>>>>>>>>>>>>> try to
> >>>>>>>>>>>>>>> keep
> >>>>>>>>>>>>>>>> them as stable as
> >>>>>>>>>>>>>>>> possible and avoid eventual breaking changes, it's
> >>> compelling to
> >>>>>>>>>>>>>>> external
> >>>>>>>>>>>>>>>> connectors and
> >>>>>>>>>>>>>>>> formats, which might be decoupled from the flink release
> >>> cycle
> >>>>>>>>>>>>>>>> and might
> >>>>>>>>>>>>>>>> have different
> >>>>>>>>>>>>>>>> backward compatibility guarantees. It's totally reasonable
> >>> if we
> >>>>>>>>>>>>>>>> don't
> >>>>>>>>>>>>>>>> want to tackle it in
> >>>>>>>>>>>>>>>> this first iteration of the feature, but it's something we
> >>> need
> >>>>>>>>>>>>>>>> to keep
> >>>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>> mind for the future.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> *Functions:* It's not clear to me what you mean for
> >>>>>>>>>>>>>>>> "identifier",
> >>>>>>>>>>>>>>> because
> >>>>>>>>>>>>>>>> then somewhere
> >>>>>>>>>>>>>>>> else in the same context you talk about "name". Are we
> >>> talking
> >>>>>>>>>>>>>>>> about the
> >>>>>>>>>>>>>>>> function name
> >>>>>>>>>>>>>>>> or the function complete signature? Let's assume for
> >>> example we
> >>>>>>>>>>>>>>>> have
> >>>>>>>>>>>>>>> these
> >>>>>>>>>>>>>>>> function
> >>>>>>>>>>>>>>>> definitions:
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> * TO_TIMESTAMP_LTZ(BIGINT)
> >>>>>>>>>>>>>>>> * TO_TIMESTAMP_LTZ(STRING)
> >>>>>>>>>>>>>>>> * TO_TIMESTAMP_LTZ(STRING, STRING)
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> These for me are very different functions with different
> >>>>>>>>>>>>>>> implementations,
> >>>>>>>>>>>>>>>> where each of
> >>>>>>>>>>>>>>>> them might evolve separately at a different pace. Hence
> >>> when we
> >>>>>>>>>>>>>>>> store
> >>>>>>>>>>>>>>> them
> >>>>>>>>>>>>>>>> in the json
> >>>>>>>>>>>>>>>> plan we should perhaps use a logically defined unique id
> >>> like
> >>>>>>>>>>>>>>>> /bigIntToTimestamp/, /
> >>>>>>>>>>>>>>>> stringToTimestamp/ and /stringToTimestampWithFormat/. This
> >>> also
> >>>>>>>>>>>>>>>> solves
> >>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>> issue of
> >>>>>>>>>>>>>>>> correctly referencing the functions when restoring the
> plan,
> >>>>>>>>>>>>>>>> without
> >>>>>>>>>>>>>>>> running again the
> >>>>>>>>>>>>>>>> inference logic (which might have been changed in the
> >>> meantime)
> >>>>>>>>>>>>>>>> and it
> >>>>>>>>>>>>>>>> might also solve
> >>>>>>>>>>>>>>>> the versioning, that is the function identifier can
> contain
> >>> the
> >>>>>>>>>>>>>>>> function
> >>>>>>>>>>>>>>>> version like /
> >>>>>>>>>>>>>>>> stringToTimestampWithFormat_1_1 /or
> >>>>>>>>>>>>>>>> /stringToTimestampWithFormat_1_2/.
> >>>>>>>>>>>>>>> An
> >>>>>>>>>>>>>>>> alternative could be to use the string signature
> >>> representation,
> >>>>>>>>>>>>>>>> which
> >>>>>>>>>>>>>>>> might not be trivial
> >>>>>>>>>>>>>>>> to compute, given the complexity of our type inference
> >>> logic.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> *The term "JSON plan"*: I think we should rather keep JSON
> >>> out
> >>>>>>>>>>>>>>>> of the
> >>>>>>>>>>>>>>>> concept and just
> >>>>>>>>>>>>>>>> name it "Compiled Plan" (like the proposed API) or
> something
> >>>>>>>>>>>>>>>> similar,
> >>>>>>>>>>>>>>> as I
> >>>>>>>>>>>>>>>> see how in
> >>>>>>>>>>>>>>>> future we might decide to support/modify our persistence
> >>>>>>>>>>>>>>>> format to
> >>>>>>>>>>>>>>>> something more
> >>>>>>>>>>>>>>>> efficient storage wise like BSON. For example, I would
> >>> rename /
> >>>>>>>>>>>>>>>> CompiledPlan.fromJsonFile/ to simply
> >>> /CompiledPlan.fromFile/.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> *Who is the owner of the plan file?* I asked myself this
> >>>>>>>>>>>>>>>> question when
> >>>>>>>>>>>>>>>> reading this:
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> For simplification of the design, we assume that upgrades
> >>> use a
> >>>>>>>>>>>>>>>>> step
> >>>>>>>>>>>>>>>> size of a single
> >>>>>>>>>>>>>>>> minor version. We don't guarantee skipping minor versions
> >>> (e.g.
> >>>>>>>>>>>>>>>> 1.11 to
> >>>>>>>>>>>>>>>> 1.14).
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> My understanding of this statement is that a user can
> >>> upgrade
> >>>>>>>>>>>>>>>> between
> >>>>>>>>>>>>>>>> minors but then
> >>>>>>>>>>>>>>>> following all the minors, the same query can remain up and
> >>>>>>>> running.
> >>>>>>>>>>>>>>> E.g. I
> >>>>>>>>>>>>>>>> upgrade from
> >>>>>>>>>>>>>>>> 1.15 to 1.16, and then from 1.16 to 1.17 and I still
> expect
> >>> my
> >>>>>>>>>>>>>>>> original
> >>>>>>>>>>>>>>>> query to work
> >>>>>>>>>>>>>>>> without recomputing the plan. This necessarily means that
> at
> >>>>>>>>>>>>>>>> some point
> >>>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>> future
> >>>>>>>>>>>>>>>> releases we'll need some basic "migration" tool to keep
> the
> >>>>>>>>>>>>>>>> queries up
> >>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>> running,
> >>>>>>>>>>>>>>>> ending up modifying the compiled plan. So I guess flink
> >>> should
> >>>>>>>>>>>>>>>> write it
> >>>>>>>>>>>>>>>> back in the original
> >>>>>>>>>>>>>>>> plan file, perhaps doing a backup of the previous one? Can
> >>> you
> >>>>>>>>>>>>>>>> please
> >>>>>>>>>>>>>>>> clarify this aspect?
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Except these considerations, the proposal looks good to me
> >>>>>>>>>>>>>>>> and I'm
> >>>>>>>>>>>>>>> eagerly
> >>>>>>>>>>>>>>>> waiting to see
> >>>>>>>>>>>>>>>> it in play.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>>>> FG
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> --
> >>>>>>>>>>>>>>>> Francesco Guardiani | Software Engineer
> >>>>>>>>>>>>>>>> france...@ververica.com[1]
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Follow us @VervericaData
> >>>>>>>>>>>>>>>> --
> >>>>>>>>>>>>>>>> Join Flink Forward[2] - The Apache Flink Conference
> >>>>>>>>>>>>>>>> Stream Processing | Event Driven | Real Time
> >>>>>>>>>>>>>>>> --
> >>>>>>>>>>>>>>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin,
> Germany
> >>>>>>>>>>>>>>>> --
> >>>>>>>>>>>>>>>> Ververica GmbH
> >>>>>>>>>>>>>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> >>>>>>>>>>>>>>>> Managing Directors: Karl Anton Wehner, Holger Temme, Yip
> >>> Park
> >>>>>>>>>>>>>>>> Tung
> >>>>>>>>>>>>>>> Jason,
> >>>>>>>>>>>>>>>> Jinwei (Kevin)
> >>>>>>>>>>>>>>>> Zhang
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> --------
> >>>>>>>>>>>>>>>> [1] mailto:france...@ververica.com
> >>>>>>>>>>>>>>>> [2] https://flink-forward.org/
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> --
> >>>>>>>>>>>>>> Marios
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>>
> >
>
>

Reply via email to