Hi, Timo, thanks for updating the doc.

I have a comment on plan migration:
I think we may need to add a version field for every exec node when
serialising. In earlier discussions, I think we have a conclusion that
treating the version of plan as the version of node, but in this case it
would be broken.
Take the following example in FLIP into consideration, there is a bad case:
when in 1.17, we introduced an incompatible version 3 and dropped version
1, we can only update the version to 2, so the version should be per exec
node.

ExecNode version *1* is not supported anymore. Even though the state is
actually compatible. The plan restore will fail with a helpful exception
that forces users to perform plan migration.

COMPILE PLAN '/mydir/plan_new.json' FROM '/mydir/plan_old.json';

The plan migration will safely replace the old version *1* with *2. The
JSON plan flinkVersion changes to 1.17.*


Best,

Wenlong

On Thu, 9 Dec 2021 at 18:36, Timo Walther <twal...@apache.org> wrote:

> Hi Jing and Godfrey,
>
> I had another iteration over the document. There are two major changes:
>
> 1. Supported Flink Upgrade Versions
>
> I got the feedback via various channels that a step size of one minor
> version is not very convenient. As you said, "because upgrading to a new
> version is a time-consuming process". I rephrased this section:
>
> Upgrading usually involves work which is why many users perform this
> task rarely (e.g. only once per year). Also skipping a versions is
> common until a new feature has been introduced for which is it worth to
> upgrade. We will support the upgrade to the most recent Flink version
> from a set of previous versions. We aim to support upgrades from the
> last 2-3 releases on a best-effort basis; maybe even more depending on
> the maintenance overhead. However, in order to not grow the testing
> matrix infinitely and to perform important refactoring if necessary, we
> only guarantee upgrades with a step size of a single minor version (i.e.
> a cascade of upgrades).
>
> 2. Annotation Design
>
> I also adopted the multiple annotations design for the previous
> supportPlanFormat. So no array of versions anymore. I reworked the
> section, please have a look with updated examples:
>
>
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=191336489#FLIP190:SupportVersionUpgradesforTableAPI&SQLPrograms-ExecNodeTests
>
> I also got the feedback offline that `savepoint` might not be the right
> terminology for the annotation. I changed that to minPlanVersion and
> minStateVersion.
>
> Let me know what you think.
>
> Regards,
> Timo
>
>
>
> On 09.12.21 08:44, Jing Zhang wrote:
> > Hi Timo,
> > Thanks a lot for driving this discussion.
> > I believe it could solve many problems what we are suffering in
> upgrading.
> >
> > I only have a little complain on the following point.
> >
> >> For simplification of the design, we assume that upgrades use a step
> size
> > of a single minor version. We don't guarantee skipping minor versions
> (e.g.
> > 1.11 to
> > 1.14).
> >
> > In our internal production environment, we follow up with the community's
> > latest stable release version almost once a year because upgrading to a
> new
> > version is a time-consuming process.
> > So we might missed 1~3 version after we upgrade to the latest version.
> This
> > might also appears in other company too.
> > Could we guarantee FLIP-190 work if we skip minor versions less than
> > specified threshold?
> > Then we could know which version is good for us when prepare upgrading.
> >
> > Best,
> > Jing Zhang
> >
> > godfrey he <godfre...@gmail.com> 于2021年12月8日周三 22:16写道:
> >
> >> Hi Timo,
> >>
> >> Thanks for the explanation, it's much clearer now.
> >>
> >> One thing I want to confirm about `supportedPlanFormat `
> >> and `supportedSavepointFormat `:
> >> `supportedPlanFormat ` supports multiple versions,
> >> while `supportedSavepointFormat ` supports only one version ?
> >> A json plan  can be deserialized by multiple versions
> >> because default value will be set for new fields.
> >> In theory, a Savepoint can be restored by more than one version
> >> of the operators even if a state layout is changed,
> >> such as deleting a whole state and starting job with
> >> `allowNonRestoredState`=true.
> >> I think this is a corner case, and it's hard to understand comparing
> >> to `supportedPlanFormat ` supporting multiple versions.
> >> So, for most cases, when the state layout is changed, the savepoint is
> >> incompatible,
> >> and `supportedSavepointFormat` and version need to be changed.
> >>
> >> I think we need a detail explanation about the annotations change story
> in
> >> the java doc of  `ExecNodeMetadata` class for all developers
> >> (esp. those unfamiliar with this part).
> >>
> >> Best,
> >> Godfrey
> >>
> >> Timo Walther <twal...@apache.org> 于2021年12月8日周三 下午4:57写道:
> >>>
> >>> Hi Wenlong,
> >>>
> >>> thanks for the feedback. Great that we reached consensus here. I will
> >>> update the entire document with my previous example shortly.
> >>>
> >>>   > if we don't update the version when plan format changes, we can't
> >>> find that the plan can't not be deserialized in 1.15
> >>>
> >>> This should not be a problem as the entire plan file has a version as
> >>> well. We should not allow reading a 1.16 plan in 1.15. We can throw a
> >>> helpful exception early.
> >>>
> >>> Reading a 1.15 plan in 1.16 is possible until we drop the old
> >>> `supportedPlanFormat` from one of used ExecNodes. Afterwards all
> >>> `supportedPlanFormat` of ExecNodes must be equal or higher then the
> plan
> >>> version.
> >>>
> >>> Regards,
> >>> Timo
> >>>
> >>> On 08.12.21 03:07, wenlong.lwl wrote:
> >>>> Hi, Timo,  +1 for multi metadata.
> >>>>
> >>>> The compatible change I mean in the last email is the slight state
> >> change
> >>>> example you gave, so we have got  consensus on this actually, IMO.
> >>>>
> >>>> Another question based on the example you gave:
> >>>> In the example "JSON node gets an additional property in 1.16", if we
> >> don't
> >>>> update the version when plan format changes, we can't find that the
> >> plan
> >>>> can't not be deserialized in 1.15, although the savepoint state is
> >>>> compatible.
> >>>> The error message may be not so friendly if we just throw
> >> deserialization
> >>>> failure.
> >>>>
> >>>> On Tue, 7 Dec 2021 at 16:49, Timo Walther <twal...@apache.org> wrote:
> >>>>
> >>>>> Hi Wenlong,
> >>>>>
> >>>>>    > First,  we add a newStateLayout because of some improvement in
> >> state, in
> >>>>>    > order to keep compatibility we may still keep the old state for
> >> the
> >>>>> first
> >>>>>    > version. We need to update the version, so that we can generate
> a
> >> new
> >>>>>    > version plan for the new job and keep the exec node compatible
> >> with
> >>>>> the old
> >>>>>    > version plan.
> >>>>>
> >>>>> The problem that I see here for contributors is that the actual
> update
> >>>>> of a version is more complicated than just updating an integer value.
> >> It
> >>>>> means copying a lot of ExecNode code for a change that happens
> locally
> >>>>> in an operator. Let's assume multiple ExecNodes use a similar
> >> operator.
> >>>>> Why do we need to update all ExecNode versions, if the operator
> itself
> >>>>> can deal with the incompatibility. The ExecNode version is meant for
> >>>>> topology changes or fundamental state changes.
> >>>>>
> >>>>> If we don't find consensus on this topic, I would at least vote for
> >>>>> supporting multiple annotations for an ExecNode class. This way we
> >> don't
> >>>>> need to copy code but only add two ExecNode annotations with
> different
> >>>>> ExecNode versions.
> >>>>>
> >>>>>    > Maybe we can add support for this case :
> >>>>>    > when an exec node is changed in 1.16, but is compatible with
> 1.15,
> >>>>>    > we can use the node of 1.16 to deserialize the plan of 1.15.
> >>>>>
> >>>>> If the ExecNode is compatible, there is no reason to increase the
> >>>>> ExecNode version.
> >>>>>
> >>>>>
> >>>>>
> >>>>> I tried to come up with a reworked solution to make all parties
> happy:
> >>>>>
> >>>>> 1. Let's assume the following annotations:
> >>>>>
> >>>>> supportedPlanFormat = [1.15]
> >>>>>
> >>>>> supportedSavepointFormat = 1.15
> >>>>>
> >>>>> we drop `added` as it is equal to `supportedSavepointFormat`
> >>>>>
> >>>>> 2. Multiple annotations over ExecNodes are possible:
> >>>>>
> >>>>> // operator state changes
> >>>>>
> >>>>> // initial introduction in 1.15
> >>>>> @ExecNodeMetadata(name=A, version=1, supportedPlanFormat=1.15,
> >>>>> supportedSavepointFormat=1.15)
> >>>>>
> >>>>> // state layout changed slightly in 1.16
> >>>>> // - operator migration is possible
> >>>>> // - operator supports state of both versions and will perform
> >> operator
> >>>>> state migration
> >>>>> // - new plans will get new ExecNode version
> >>>>> @ExecNodeMetadata(name=A, version=1, supportedPlanFormat=1.15,
> >>>>> supportedSavepointFormat=1.15)
> >>>>> @ExecNodeMetadata(name=A, version=2, supportedPlanFormat=1.15,
> >>>>> supportedSavepointFormat=1.16)
> >>>>>
> >>>>> // we force a plan migration in 1.17
> >>>>> // - we assume that all operator states have been migrated in the
> >>>>> previous version
> >>>>> // - we can safely replace the old version `1` with `2` and only keep
> >>>>> the new savepoint format
> >>>>> @ExecNodeMetadata(name=A, version=2, supportedPlanFormat=1.15,
> >>>>> supportedSavepointFormat=1.16)
> >>>>>
> >>>>>
> >>>>> // plan changes
> >>>>>
> >>>>> // initial introduction in 1.15
> >>>>> @ExecNodeMetadata(name=A, version=1, supportedPlanFormat=1.15,
> >>>>> supportedSavepointFormat=1.15)
> >>>>>
> >>>>> // JSON node gets an additional property in 1.16
> >>>>> // e.g. { some-prop: 42 } -> { some-prop: 42, some-flag: false}
> >>>>> // - ExecNode version does not change
> >>>>> // - ExecNode version only changes when topology or state is affected
> >>>>> // - we support both JSON plan formats, the old and the newest one
> >>>>> @ExecNodeMetadata(name=A, version=1, supportedPlanFormat=[1.15,
> 1.16],
> >>>>> supportedSavepointFormat=1.15)
> >>>>>
> >>>>> // we force a plan migration in 1.17
> >>>>> // - now we only support 1.16 plan format
> >>>>> @ExecNodeMetadata(name=A, version=1, supportedPlanFormat=1.16,
> >>>>> supportedSavepointFormat=1.15)
> >>>>>
> >>>>>
> >>>>> // topology change
> >>>>>
> >>>>> // initial introduction in 1.15
> >>>>> @ExecNodeMetadata(name=A, version=1, supportedPlanFormat=1.15,
> >>>>> supportedSavepointFormat=1.15)
> >>>>>
> >>>>> // complete new class structure in 1.16 annotated with
> >>>>> @ExecNodeMetadata(name=A, version=2, supportedPlanFormat=1.15,
> >>>>> supportedSavepointFormat=1.16)
> >>>>>
> >>>>>
> >>>>>
> >>>>> What do you think?
> >>>>>
> >>>>>
> >>>>> Regards,
> >>>>> Timo
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>> On 07.12.21 08:20, wenlong.lwl wrote:
> >>>>>> Maybe we can add support for this case :
> >>>>>>            when an exec node is changed in 1.16, but is compatible
> >> with
> >>>>> 1.15,
> >>>>>> we can use the node of 1.16 to deserialize the plan of 1.15.
> >>>>>> By this way, we don't need to fork the code if the change is
> >> compatible,
> >>>>>> and can avoid fork code frequently.
> >>>>>>
> >>>>>>
> >>>>>> Best,
> >>>>>> Wenlong
> >>>>>>
> >>>>>>
> >>>>>> On Tue, 7 Dec 2021 at 15:08, wenlong.lwl <wenlong88....@gmail.com>
> >>>>> wrote:
> >>>>>>
> >>>>>>> hi, Timo, I would prefer to update the version every time we change
> >> the
> >>>>>>> state layer too.
> >>>>>>>
> >>>>>>> It could be possible that we change the exec node in 2 steps:
> >>>>>>> First,  we add a newStateLayout because of some improvement in
> >> state, in
> >>>>>>> order to keep compatibility we may still keep the old state for the
> >>>>> first
> >>>>>>> version. We need to update the version, so that we can generate a
> >> new
> >>>>>>> version plan for the new job and keep the exec node compatible with
> >> the
> >>>>> old
> >>>>>>> version plan.
> >>>>>>> After some versions, we may remove the old version state layout and
> >>>>> clean
> >>>>>>> up the deprecated code. We still need to update the version, so
> >> that we
> >>>>> can
> >>>>>>> verify that we are compatible with the plan after the first change,
> >> but
> >>>>> not
> >>>>>>> compatible with the plan earlier.
> >>>>>>>
> >>>>>>>
> >>>>>>> Best,
> >>>>>>> Wenlong
> >>>>>>>
> >>>>>>> On Mon, 6 Dec 2021 at 21:27, Timo Walther <twal...@apache.org>
> >> wrote:
> >>>>>>>
> >>>>>>>> Hi Godfrey,
> >>>>>>>>
> >>>>>>>>     > design makes thing more complex.
> >>>>>>>>
> >>>>>>>> Yes, the design might be a bit more complex. But operator
> >> migration is
> >>>>>>>> way easier than ExecNode migration at a later point in time for
> >> code
> >>>>>>>> maintenance. We know that ExecNodes can become pretty complex.
> Even
> >>>>>>>> though we have put a lot of code into `CommonXXExecNode` it will
> >> be a
> >>>>>>>> lot of work to maintain multiple versions of ExecNodes. If we can
> >> avoid
> >>>>>>>> this with operator state migration, this should always be
> preferred
> >>>>> over
> >>>>>>>> a new ExecNode version.
> >>>>>>>>
> >>>>>>>> I'm aware that operator state migration might only be important
> for
> >>>>>>>> roughly 10 % of all changes. A new ExecNode version will be used
> >> for
> >>>>> 90%
> >>>>>>>> of all changes.
> >>>>>>>>
> >>>>>>>>     > If there are multiple state layouts, which layout the
> ExecNode
> >>>>> should
> >>>>>>>> use?
> >>>>>>>>
> >>>>>>>> It is not the responsibility of the ExecNode to decide this but
> the
> >>>>>>>> operator. Something like:
> >>>>>>>>
> >>>>>>>> class X extends ProcessFunction {
> >>>>>>>>       ValueState<A> oldStateLayout;
> >>>>>>>>       ValueState<B> newStateLayout;
> >>>>>>>>
> >>>>>>>>       open() {
> >>>>>>>>         if (oldStateLayout.get() != null) {
> >>>>>>>>           performOperatorMigration();
> >>>>>>>>         }
> >>>>>>>>         useNewStateLayout();
> >>>>>>>>       }
> >>>>>>>> }
> >>>>>>>>
> >>>>>>>> Operator migration is meant for smaller "more local" changes
> >> without
> >>>>>>>> touching the ExecNode layer. The CEP library and DataStream API
> >> sources
> >>>>>>>> are performing operator migration for years already.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>     > `supportedPlanChanges ` and `supportedSavepointChanges ` are
> >> a bit
> >>>>>>>> obscure.
> >>>>>>>>
> >>>>>>>> Let me try to come up with more examples why I think both
> >> annotation
> >>>>>>>> make sense and are esp. important *for test coverage*.
> >>>>>>>>
> >>>>>>>> supportedPlanChanges:
> >>>>>>>>
> >>>>>>>> Let's assume we have some JSON in Flink 1.15:
> >>>>>>>>
> >>>>>>>> {
> >>>>>>>>       some-prop: 42
> >>>>>>>> }
> >>>>>>>>
> >>>>>>>> And we want to extend the JSON in Flink 1.16:
> >>>>>>>>
> >>>>>>>> {
> >>>>>>>>       some-prop: 42,
> >>>>>>>>       some-flag: false
> >>>>>>>> }
> >>>>>>>>
> >>>>>>>> Maybe we don't need to increase the ExecNode version but only
> >> ensure
> >>>>>>>> that the flag is set to `false` by default for the older versions.
> >>>>>>>>
> >>>>>>>> We need a location to track changes and document the changelog.
> >> With
> >>>>> the
> >>>>>>>> help of the annotation supportedPlanChanges = [1.15, 1.16] we can
> >>>>> verify
> >>>>>>>> that we have tests for both JSON formats.
> >>>>>>>>
> >>>>>>>> And once we decide to drop the 1.15 format, we enforce plan
> >> migration
> >>>>>>>> and fill-in the default value `false` into the old plans and bump
> >> their
> >>>>>>>> JSON plan version to 1.16 or higher.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>     > once the state layout is changed, the ExecNode version needs
> >> also
> >>>>> be
> >>>>>>>> updated
> >>>>>>>>
> >>>>>>>> This will still be the majority of cases. But if we can avoid
> >> this, we
> >>>>>>>> should do it for not having too much duplicate code to maintain.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> Thanks,
> >>>>>>>> Timo
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On 06.12.21 09:58, godfrey he wrote:
> >>>>>>>>> Hi, Timo,
> >>>>>>>>>
> >>>>>>>>> Thanks for the detailed explanation.
> >>>>>>>>>
> >>>>>>>>>> We change an operator state of B in Flink 1.16. We perform the
> >> change
> >>>>>>>> in the operator of B in a way to support both state layouts. Thus,
> >> no
> >>>>> need
> >>>>>>>> for a new ExecNode version.
> >>>>>>>>>
> >>>>>>>>> I think this design makes thing more complex.
> >>>>>>>>> 1. If there are multiple state layouts, which layout the ExecNode
> >>>>>>>> should use ?
> >>>>>>>>> It increases the cost of understanding for developers (especially
> >> for
> >>>>>>>>> Flink newer),
> >>>>>>>>> making them prone to mistakes.
> >>>>>>>>> 2. `supportedPlanChanges ` and `supportedSavepointChanges ` are a
> >> bit
> >>>>>>>> obscure.
> >>>>>>>>>
> >>>>>>>>> The purpose of ExecNode annotations are not only to support
> >> powerful
> >>>>>>>> validation,
> >>>>>>>>> but more importantly to make it easy for developers to understand
> >>>>>>>>> to ensure that every modification is easy and state compatible.
> >>>>>>>>>
> >>>>>>>>> I prefer, once the state layout is changed, the ExecNode version
> >> needs
> >>>>>>>>> also be updated.
> >>>>>>>>> which could make thing simple. How about
> >>>>>>>>> rename `supportedPlanChanges ` to `planCompatibleVersion`
> >>>>>>>>> (which means the plan is compatible with the plan generated by
> the
> >>>>>>>>> given version node)
> >>>>>>>>>      and rename `supportedSavepointChanges` to
> >>>>> `savepointCompatibleVersion
> >>>>>>>> `
> >>>>>>>>> (which means the state is compatible with the state generated by
> >> the
> >>>>>>>>> given version node) ?
> >>>>>>>>> The names also indicate that only one version value can be set.
> >>>>>>>>>
> >>>>>>>>> WDYT?
> >>>>>>>>>
> >>>>>>>>> Best,
> >>>>>>>>> Godfrey
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> Timo Walther <twal...@apache.org> 于2021年12月2日周四 下午11:42写道:
> >>>>>>>>>>
> >>>>>>>>>> Response to Marios's feedback:
> >>>>>>>>>>
> >>>>>>>>>>      > there should be some good logging in place when the
> >> upgrade is
> >>>>>>>> taking
> >>>>>>>>>> place
> >>>>>>>>>>
> >>>>>>>>>> Yes, I agree. I added this part to the FLIP.
> >>>>>>>>>>
> >>>>>>>>>>      > config option instead that doesn't provide the
> flexibility
> >> to
> >>>>>>>>>> overwrite certain plans
> >>>>>>>>>>
> >>>>>>>>>> One can set the config option also around sections of the
> >>>>>>>>>> multi-statement SQL script.
> >>>>>>>>>>
> >>>>>>>>>> SET 'table.plan.force-recompile'='true';
> >>>>>>>>>>
> >>>>>>>>>> COMPILE ...
> >>>>>>>>>>
> >>>>>>>>>> SET 'table.plan.force-recompile'='false';
> >>>>>>>>>>
> >>>>>>>>>> But the question is why a user wants to run COMPILE multiple
> >> times.
> >>>>> If
> >>>>>>>>>> it is during development, then running EXECUTE (or just the
> >> statement
> >>>>>>>>>> itself) without calling COMPILE should be sufficient. The file
> >> can
> >>>>> also
> >>>>>>>>>> manually be deleted if necessary.
> >>>>>>>>>>
> >>>>>>>>>> What do you think?
> >>>>>>>>>>
> >>>>>>>>>> Regards,
> >>>>>>>>>> Timo
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> On 02.12.21 16:09, Timo Walther wrote:
> >>>>>>>>>>> Hi Till,
> >>>>>>>>>>>
> >>>>>>>>>>> Yes, you might have to. But not a new plan from the SQL query
> >> but a
> >>>>>>>>>>> migration from the old plan to the new plan. This will not
> >> happen
> >>>>>>>> often.
> >>>>>>>>>>> But we need a way to evolve the format of the JSON plan itself.
> >>>>>>>>>>>
> >>>>>>>>>>> Maybe this confuses a bit, so let me clarify it again: Mostly
> >>>>> ExecNode
> >>>>>>>>>>> versions and operator state layouts will evolve. Not the plan
> >> files,
> >>>>>>>>>>> those will be pretty stable. But also not infinitely.
> >>>>>>>>>>>
> >>>>>>>>>>> Regards,
> >>>>>>>>>>> Timo
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> On 02.12.21 16:01, Till Rohrmann wrote:
> >>>>>>>>>>>> Then for migrating from Flink 1.10 to 1.12, I might have to
> >> create
> >>>>> a
> >>>>>>>> new
> >>>>>>>>>>>> plan using Flink 1.11 in order to migrate from Flink 1.11 to
> >> 1.12,
> >>>>>>>> right?
> >>>>>>>>>>>>
> >>>>>>>>>>>> Cheers,
> >>>>>>>>>>>> Till
> >>>>>>>>>>>>
> >>>>>>>>>>>> On Thu, Dec 2, 2021 at 3:39 PM Timo Walther <
> >> twal...@apache.org>
> >>>>>>>> wrote:
> >>>>>>>>>>>>
> >>>>>>>>>>>>> Response to Till's feedback:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>       > compiled plan won't be changed after being written
> >> initially
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> This is not entirely correct. We give guarantees for keeping
> >> the
> >>>>>>>> query
> >>>>>>>>>>>>> up and running. We reserve us the right to force plan
> >> migrations.
> >>>>> In
> >>>>>>>>>>>>> this case, the plan might not be created from the SQL
> >> statement
> >>>>> but
> >>>>>>>> from
> >>>>>>>>>>>>> the old plan. I have added an example in section 10.1.1. In
> >>>>> general,
> >>>>>>>>>>>>> both persisted entities "plan" and "savepoint" can evolve
> >>>>>>>> independently
> >>>>>>>>>>>>> from each other.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>> Timo
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On 02.12.21 15:10, Timo Walther wrote:
> >>>>>>>>>>>>>> Response to Godfrey's feedback:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>       > "EXPLAIN PLAN EXECUTE STATEMENT SET BEGIN ... END"
> is
> >>>>> missing.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Thanks for the hint. I added a dedicated section 7.1.3.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>       > it's hard to maintain the supported versions for
> >>>>>>>>>>>>>> "supportedPlanChanges" and "supportedSavepointChanges"
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Actually, I think we are mostly on the same page.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> The annotation does not need to be updated for every Flink
> >>>>>>>> version. As
> >>>>>>>>>>>>>> the name suggests it is about "Changes" (in other words:
> >>>>>>>>>>>>>> incompatibilities) that require some kind of migration.
> >> Either
> >>>>> plan
> >>>>>>>>>>>>>> migration (= PlanChanges) or savepoint migration
> >>>>>>>> (=SavepointChanges,
> >>>>>>>>>>>>>> using operator migration or savepoint migration).
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Let's assume we introduced two ExecNodes A and B in Flink
> >> 1.15.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> The annotations are:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> @ExecNodeMetadata(name=A, supportedPlanChanges=1.15,
> >>>>>>>>>>>>>> supportedSavepointChanges=1.15)
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> @ExecNodeMetadata(name=B, supportedPlanChanges=1.15,
> >>>>>>>>>>>>>> supportedSavepointChanges=1.15)
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> We change an operator state of B in Flink 1.16.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> We perform the change in the operator of B in a way to
> >> support
> >>>>> both
> >>>>>>>>>>>>>> state layouts. Thus, no need for a new ExecNode version.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> The annotations in 1.16 are:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> @ExecNodeMetadata(name=A, supportedPlanChanges=1.15,
> >>>>>>>>>>>>>> supportedSavepointChanges=1.15)
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> @ExecNodeMetadata(name=B, supportedPlanChanges=1.15,
> >>>>>>>>>>>>>> supportedSavepointChanges=1.15, 1.16)
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> So the versions in the annotations are "start version"s.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> I don't think we need end versions? End version would mean
> >> that
> >>>>> we
> >>>>>>>> drop
> >>>>>>>>>>>>>> the ExecNode from the code base?
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Please check the section 10.1.1 again. I added a more
> complex
> >>>>>>>> example.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>> Timo
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> On 01.12.21 16:29, Timo Walther wrote:
> >>>>>>>>>>>>>>> Response to Francesco's feedback:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>       > *Proposed changes #6*: Other than defining this
> rule
> >> of
> >>>>>>>> thumb, we
> >>>>>>>>>>>>>>> must also make sure that compiling plans with these objects
> >> that
> >>>>>>>>>>>>>>> cannot be serialized in the plan must fail hard
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Yes, I totally agree. We will fail hard with a helpful
> >>>>> exception.
> >>>>>>>> Any
> >>>>>>>>>>>>>>> mistake e.g. using a inline object in Table API or an
> >> invalid
> >>>>>>>>>>>>>>> DataStream API source without uid should immediately fail a
> >> plan
> >>>>>>>>>>>>>>> compilation step. I added a remark to the FLIP again.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>       > What worries me is breaking changes, in particular
> >>>>>>>> behavioural
> >>>>>>>>>>>>>>> changes that might happen in connectors/formats
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Breaking changes in connectors and formats need to be
> >> encoded in
> >>>>>>>> the
> >>>>>>>>>>>>>>> options. I could also imagine to versioning in the factory
> >>>>>>>> identifier
> >>>>>>>>>>>>>>> `connector=kafka` and `connector=kafka-2`. If this is
> >> necessary.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> After thinking about your question again, I think we will
> >> also
> >>>>>>>> need
> >>>>>>>>>>>>>>> the same testing infrastructure for our connectors and
> >> formats.
> >>>>>>>> Esp.
> >>>>>>>>>>>>>>> restore tests and completeness test. I updated the document
> >>>>>>>>>>>>>>> accordingly. Also I added a way to generate UIDs for
> >> DataStream
> >>>>>>>> API
> >>>>>>>>>>>>>>> providers.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>       > *Functions:* Are we talking about the function name
> >> or the
> >>>>>>>>>>>>>>> function
> >>>>>>>>>>>>>>> complete signature?
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> For catalog functions, the identifier contains catalog name
> >> and
> >>>>>>>>>>>>>>> database name. For system functions, identifier contains
> >> only a
> >>>>>>>> name
> >>>>>>>>>>>>>>> which make function name and identifier identical. I
> >> reworked
> >>>>> the
> >>>>>>>>>>>>>>> section again and also fixed some of the naming conflicts
> >> you
> >>>>>>>>>>>>>>> mentioned.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>       > we should perhaps use a logically defined unique id
> >> like
> >>>>>>>>>>>>>>> /bigIntToTimestamp/
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> I added a concrete example for the resolution and
> >> restoration.
> >>>>> The
> >>>>>>>>>>>>>>> unique id is composed of name + version. Internally, this
> is
> >>>>>>>>>>>>>>> represented as `$TO_TIMESTAMP_LTZ$1`.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>       > I think we should rather keep JSON out of the
> concept
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Sounds ok to me. In SQL we also just call it "plan". I will
> >>>>>>>> change the
> >>>>>>>>>>>>>>> file sections. But would suggest to keep the fromJsonString
> >>>>>>>> method.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>       > write it back in the original plan file
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> I updated the terminology section for what we consider an
> >>>>>>>> "upgrade".
> >>>>>>>>>>>>>>> We might need to update the orginal plan file. This is
> >> already
> >>>>>>>>>>>>>>> considered in the COMPILE PLAN ... FROM ... even though
> >> this is
> >>>>>>>> future
> >>>>>>>>>>>>>>> work. Also savepoint migration.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Thanks for all the feedback!
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Timo
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> On 30.11.21 14:28, Timo Walther wrote:
> >>>>>>>>>>>>>>>> Response to Wenlongs's feedback:
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>       > I would prefer not to provide such a shortcut, let
> >> users
> >>>>> use
> >>>>>>>>>>>>>>>> COMPILE PLAN IF NOT EXISTS and EXECUTE explicitly, which
> >> can be
> >>>>>>>>>>>>>>>> understood by new users even without inferring the docs.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> I would like to hear more opinions on this topic.
> >> Personally, I
> >>>>>>>> find
> >>>>>>>>>>>>>>>> a combined statement very useful. Not only for quicker
> >>>>>>>> development
> >>>>>>>>>>>>>>>> and debugging but also for readability. It helps in
> >> keeping the
> >>>>>>>> JSON
> >>>>>>>>>>>>>>>> path and the query close to each other in order to know
> the
> >>>>>>>> origin of
> >>>>>>>>>>>>>>>> the plan.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>       > but the plan and SQL are not matched. The result
> >> would be
> >>>>>>>> quite
> >>>>>>>>>>>>>>>> confusing if we still execute the plan directly, we may
> >> need to
> >>>>>>>> add a
> >>>>>>>>>>>>>>>> validation.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> You are right that there could be a mismatch. But we have
> a
> >>>>>>>> similar
> >>>>>>>>>>>>>>>> problem when executing CREATE TABLE IF NOT EXISTS. The
> >> schema
> >>>>> or
> >>>>>>>>>>>>>>>> options of a table could have changed completely in the
> >> catalog
> >>>>>>>> but
> >>>>>>>>>>>>>>>> the CREATE TABLE IF NOT EXISTS is not executed again. So a
> >>>>>>>> mismatch
> >>>>>>>>>>>>>>>> could also occur there.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Regards,
> >>>>>>>>>>>>>>>> Timo
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> On 30.11.21 14:17, Timo Walther wrote:
> >>>>>>>>>>>>>>>>> Hi everyone,
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> thanks for the feedback so far. Let me answer each email
> >>>>>>>>>>>>>>>>> indvidually.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> I will start with a response to Ingo's feedback:
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>       > Will the JSON plan's schema be considered an API?
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> No, not in the first version. This is explicitly
> >> mentioned in
> >>>>>>>> the
> >>>>>>>>>>>>>>>>> `General JSON Plan Assumptions`. I tried to improve the
> >>>>> section
> >>>>>>>> once
> >>>>>>>>>>>>>>>>> more to make it clearer. However, the JSON plan is
> >> definitely
> >>>>>>>> stable
> >>>>>>>>>>>>>>>>> per minor version. And since the plan is versioned by
> >> Flink
> >>>>>>>> version,
> >>>>>>>>>>>>>>>>> external tooling could be build around it. We might make
> >> it
> >>>>>>>> public
> >>>>>>>>>>>>>>>>> API once the design has settled.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>       > Given that upgrades across multiple versions at
> >> once are
> >>>>>>>>>>>>>>>>> unsupported, do we verify this somehow?
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Good question. I extended the `General JSON Plan
> >> Assumptions`.
> >>>>>>>> Now
> >>>>>>>>>>>>>>>>> yes: the Flink version is part of the JSON plan and will
> >> be
> >>>>>>>> verified
> >>>>>>>>>>>>>>>>> during restore. But keep in mind that we might support
> >> more
> >>>>> that
> >>>>>>>>>>>>>>>>> just the last version at least until the JSON plan has
> >> been
> >>>>>>>>>>>>>>>>> migrated.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Regards,
> >>>>>>>>>>>>>>>>> Timo
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> On 30.11.21 09:39, Marios Trivyzas wrote:
> >>>>>>>>>>>>>>>>>> I have a question regarding the `COMPILE PLAN OVEWRITE`.
> >> If
> >>>>> we
> >>>>>>>>>>>>>>>>>> choose to go
> >>>>>>>>>>>>>>>>>> with the config option instead,
> >>>>>>>>>>>>>>>>>> that doesn't provide the flexibility to overwrite
> certain
> >>>>>>>> plans but
> >>>>>>>>>>>>>>>>>> not
> >>>>>>>>>>>>>>>>>> others, since the config applies globally, isn't that
> >>>>>>>>>>>>>>>>>> something to consider?
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> On Mon, Nov 29, 2021 at 10:15 AM Marios Trivyzas <
> >>>>>>>> mat...@gmail.com>
> >>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Hi Timo!
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Thanks a lot for taking all that time and effort to put
> >>>>>>>> together
> >>>>>>>>>>>>> this
> >>>>>>>>>>>>>>>>>>> proposal!
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Regarding:
> >>>>>>>>>>>>>>>>>>>> For simplification of the design, we assume that
> >> upgrades
> >>>>>>>> use a
> >>>>>>>>>>>>>>>>>>>> step size
> >>>>>>>>>>>>>>>>>>> of a single
> >>>>>>>>>>>>>>>>>>> minor version. We don't guarantee skipping minor
> >> versions
> >>>>>>>> (e.g.
> >>>>>>>>>>>>>>>>>>> 1.11 to
> >>>>>>>>>>>>>>>>>>> 1.14).
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> I think that for this first step we should make it
> >>>>> absolutely
> >>>>>>>>>>>>>>>>>>> clear to the
> >>>>>>>>>>>>>>>>>>> users that they would need to go through all
> >> intermediate
> >>>>>>>> versions
> >>>>>>>>>>>>>>>>>>> to end up with the target version they wish. If we are
> >> to
> >>>>>>>> support
> >>>>>>>>>>>>>>>>>>> skipping
> >>>>>>>>>>>>>>>>>>> versions in the future, i.e. upgrade from 1.14 to 1.17,
> >> this
> >>>>>>>> means
> >>>>>>>>>>>>>>>>>>> that we need to have a testing infrastructure in place
> >> that
> >>>>>>>> would
> >>>>>>>>>>>>>>>>>>> test all
> >>>>>>>>>>>>>>>>>>> possible combinations of version upgrades, i.e. from
> >> 1.14 to
> >>>>>>>> 1.15,
> >>>>>>>>>>>>>>>>>>> from 1.14 to 1.16 and so forth, while still testing and
> >> of
> >>>>>>>> course
> >>>>>>>>>>>>>>>>>>> supporting all the upgrades from the previous minor
> >> version.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> I like a lot the idea of introducing HINTS to define
> >> some
> >>>>>>>>>>>>>>>>>>> behaviour in the
> >>>>>>>>>>>>>>>>>>> programs!
> >>>>>>>>>>>>>>>>>>> - the hints live together with the sql statements and
> >>>>>>>> consequently
> >>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>> (JSON) plans.
> >>>>>>>>>>>>>>>>>>> - If multiple queries are involved in a program, each
> >> one of
> >>>>>>>> them
> >>>>>>>>>>>>> can
> >>>>>>>>>>>>>>>>>>> define its own config (regarding plan optimisation, not
> >> null
> >>>>>>>>>>>>>>>>>>> enforcement,
> >>>>>>>>>>>>>>>>>>> etc)
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> I agree with Francesco on his argument regarding the
> >> *JSON*
> >>>>>>>>>>>>>>>>>>> plan. I
> >>>>>>>>>>>>>>>>>>> believe we should already provide flexibility here,
> >> since
> >>>>> (who
> >>>>>>>>>>>>>>>>>>> knows) in
> >>>>>>>>>>>>>>>>>>> the future
> >>>>>>>>>>>>>>>>>>> a JSON plan might not fulfil the desired functionality.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> I also agree that we need some very obvious way (i.e.
> >> not
> >>>>> log
> >>>>>>>>>>>>>>>>>>> entry) to
> >>>>>>>>>>>>>>>>>>> show the users that their program doesn't support
> >> version
> >>>>>>>>>>>>>>>>>>> upgrades, and
> >>>>>>>>>>>>>>>>>>> prevent them from being negatively surprised in the
> >> future,
> >>>>>>>> when
> >>>>>>>>>>>>>>>>>>> trying to
> >>>>>>>>>>>>>>>>>>> upgrade their production pipelines.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> This is an implementation detail, but I'd like to add
> >> that
> >>>>>>>> there
> >>>>>>>>>>>>>>>>>>> should be
> >>>>>>>>>>>>>>>>>>> some good logging in place when the upgrade is taking
> >> place,
> >>>>>>>> to be
> >>>>>>>>>>>>>>>>>>> able to track every restoration action, and help debug
> >> any
> >>>>>>>>>>>>>>>>>>> potential
> >>>>>>>>>>>>>>>>>>> issues arising from that.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> On Fri, Nov 26, 2021 at 2:54 PM Till Rohrmann
> >>>>>>>>>>>>>>>>>>> <trohrm...@apache.org
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Thanks for writing this FLIP Timo. I think this will
> >> be a
> >>>>>>>> very
> >>>>>>>>>>>>>>>>>>>> important
> >>>>>>>>>>>>>>>>>>>> improvement for Flink and our SQL user :-)
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Similar to Francesco I would like to understand the
> >>>>> statement
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> For simplification of the design, we assume that
> >> upgrades
> >>>>>>>> use a
> >>>>>>>>>>>>>>>>>>>>> step
> >>>>>>>>>>>>>>>>>>>> size
> >>>>>>>>>>>>>>>>>>>> of a single
> >>>>>>>>>>>>>>>>>>>> minor version. We don't guarantee skipping minor
> >> versions
> >>>>>>>> (e.g.
> >>>>>>>>>>>>>>>>>>>> 1.11 to
> >>>>>>>>>>>>>>>>>>>> 1.14).
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> a bit better. Is it because Flink does not guarantee
> >> that a
> >>>>>>>>>>>>>>>>>>>> savepoint
> >>>>>>>>>>>>>>>>>>>> created by version 1.x can be directly recovered by
> >> version
> >>>>>>>> 1.y
> >>>>>>>>>>>>>>>>>>>> with x + 1
> >>>>>>>>>>>>>>>>>>>> < y but users might have to go through a cascade of
> >>>>> upgrades?
> >>>>>>>>>>>>>>>>>>>>      From how I
> >>>>>>>>>>>>>>>>>>>> understand your proposal, the compiled plan won't be
> >>>>> changed
> >>>>>>>>>>>>>>>>>>>> after being
> >>>>>>>>>>>>>>>>>>>> written initially. Hence, I would assume that for the
> >> plan
> >>>>>>>> alone
> >>>>>>>>>>>>>>>>>>>> Flink
> >>>>>>>>>>>>>>>>>>>> will
> >>>>>>>>>>>>>>>>>>>> have to give backwards compatibility guarantees for
> all
> >>>>>>>> versions.
> >>>>>>>>>>>>>>>>>>>> Am I
> >>>>>>>>>>>>>>>>>>>> understanding this part correctly?
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Cheers,
> >>>>>>>>>>>>>>>>>>>> Till
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> On Thu, Nov 25, 2021 at 4:55 PM Francesco Guardiani <
> >>>>>>>>>>>>>>>>>>>> france...@ververica.com>
> >>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Hi Timo,
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Thanks for putting this amazing work together, I have
> >> some
> >>>>>>>>>>>>>>>>>>>>> considerations/questions
> >>>>>>>>>>>>>>>>>>>>> about the FLIP:
> >>>>>>>>>>>>>>>>>>>>> *Proposed changes #6*: Other than defining this rule
> >> of
> >>>>>>>> thumb,
> >>>>>>>>>>>>>>>>>>>>> we must
> >>>>>>>>>>>>>>>>>>>>> also make sure
> >>>>>>>>>>>>>>>>>>>>> that compiling plans with these objects that cannot
> be
> >>>>>>>>>>>>>>>>>>>>> serialized in the
> >>>>>>>>>>>>>>>>>>>>> plan must fail hard,
> >>>>>>>>>>>>>>>>>>>>> so users don't bite themselves with such issues, or
> at
> >>>>>>>> least we
> >>>>>>>>>>>>>>>>>>>>> need to
> >>>>>>>>>>>>>>>>>>>>> output warning
> >>>>>>>>>>>>>>>>>>>>> logs. In general, whenever the user is trying to use
> >> the
> >>>>>>>>>>>>>>>>>>>>> CompiledPlan
> >>>>>>>>>>>>>>>>>>>> APIs
> >>>>>>>>>>>>>>>>>>>>> and at the same
> >>>>>>>>>>>>>>>>>>>>> time, they're trying to do something "illegal" for
> the
> >>>>>>>> plan, we
> >>>>>>>>>>>>>>>>>>>>> should
> >>>>>>>>>>>>>>>>>>>>> immediately either
> >>>>>>>>>>>>>>>>>>>>> log or fail depending on the issue, in order to avoid
> >> any
> >>>>>>>>>>>>>>>>>>>>> surprises once
> >>>>>>>>>>>>>>>>>>>>> the user upgrades.
> >>>>>>>>>>>>>>>>>>>>> I would also say the same for things like registering
> >> a
> >>>>>>>>>>>>>>>>>>>>> function,
> >>>>>>>>>>>>>>>>>>>>> registering a DataStream,
> >>>>>>>>>>>>>>>>>>>>> and for every other thing which won't end up in the
> >> plan,
> >>>>> we
> >>>>>>>>>>>>>>>>>>>>> should log
> >>>>>>>>>>>>>>>>>>>>> such info to the
> >>>>>>>>>>>>>>>>>>>>> user by default.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> *General JSON Plan Assumptions #9:* When thinking to
> >>>>>>>> connectors
> >>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>>> formats, I think
> >>>>>>>>>>>>>>>>>>>>> it's reasonable to assume and keep out of the feature
> >>>>> design
> >>>>>>>>>>>>>>>>>>>>> that no
> >>>>>>>>>>>>>>>>>>>>> feature/ability can
> >>>>>>>>>>>>>>>>>>>>> deleted from a connector/format. I also don't think
> >> new
> >>>>>>>>>>>>>>>>>>>> features/abilities
> >>>>>>>>>>>>>>>>>>>>> can influence
> >>>>>>>>>>>>>>>>>>>>> this FLIP as well, given the plan is static, so if
> for
> >>>>>>>> example,
> >>>>>>>>>>>>>>>>>>>>> MyCoolTableSink in the next
> >>>>>>>>>>>>>>>>>>>>> flink version implements SupportsProjectionsPushDown,
> >> then
> >>>>>>>> it
> >>>>>>>>>>>>>>>>>>>>> shouldn't
> >>>>>>>>>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>>>>>> a problem
> >>>>>>>>>>>>>>>>>>>>> for the upgrade story since the plan is still
> >> configured
> >>>>> as
> >>>>>>>>>>>>>>>>>>>>> computed
> >>>>>>>>>>>>>>>>>>>> from
> >>>>>>>>>>>>>>>>>>>>> the previous flink
> >>>>>>>>>>>>>>>>>>>>> version. What worries me is breaking changes, in
> >>>>> particular
> >>>>>>>>>>>>>>>>>>>>> behavioural
> >>>>>>>>>>>>>>>>>>>>> changes that
> >>>>>>>>>>>>>>>>>>>>> might happen in connectors/formats. Although this
> >> argument
> >>>>>>>>>>>>>>>>>>>>> doesn't seem
> >>>>>>>>>>>>>>>>>>>>> relevant for
> >>>>>>>>>>>>>>>>>>>>> the connectors shipped by the flink project itself,
> >>>>> because
> >>>>>>>> we
> >>>>>>>>>>>>>>>>>>>>> try to
> >>>>>>>>>>>>>>>>>>>> keep
> >>>>>>>>>>>>>>>>>>>>> them as stable as
> >>>>>>>>>>>>>>>>>>>>> possible and avoid eventual breaking changes, it's
> >>>>>>>> compelling to
> >>>>>>>>>>>>>>>>>>>> external
> >>>>>>>>>>>>>>>>>>>>> connectors and
> >>>>>>>>>>>>>>>>>>>>> formats, which might be decoupled from the flink
> >> release
> >>>>>>>> cycle
> >>>>>>>>>>>>>>>>>>>>> and might
> >>>>>>>>>>>>>>>>>>>>> have different
> >>>>>>>>>>>>>>>>>>>>> backward compatibility guarantees. It's totally
> >> reasonable
> >>>>>>>> if we
> >>>>>>>>>>>>>>>>>>>>> don't
> >>>>>>>>>>>>>>>>>>>>> want to tackle it in
> >>>>>>>>>>>>>>>>>>>>> this first iteration of the feature, but it's
> >> something we
> >>>>>>>> need
> >>>>>>>>>>>>>>>>>>>>> to keep
> >>>>>>>>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>>>>>> mind for the future.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> *Functions:* It's not clear to me what you mean for
> >>>>>>>>>>>>>>>>>>>>> "identifier",
> >>>>>>>>>>>>>>>>>>>> because
> >>>>>>>>>>>>>>>>>>>>> then somewhere
> >>>>>>>>>>>>>>>>>>>>> else in the same context you talk about "name". Are
> we
> >>>>>>>> talking
> >>>>>>>>>>>>>>>>>>>>> about the
> >>>>>>>>>>>>>>>>>>>>> function name
> >>>>>>>>>>>>>>>>>>>>> or the function complete signature? Let's assume for
> >>>>>>>> example we
> >>>>>>>>>>>>>>>>>>>>> have
> >>>>>>>>>>>>>>>>>>>> these
> >>>>>>>>>>>>>>>>>>>>> function
> >>>>>>>>>>>>>>>>>>>>> definitions:
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> * TO_TIMESTAMP_LTZ(BIGINT)
> >>>>>>>>>>>>>>>>>>>>> * TO_TIMESTAMP_LTZ(STRING)
> >>>>>>>>>>>>>>>>>>>>> * TO_TIMESTAMP_LTZ(STRING, STRING)
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> These for me are very different functions with
> >> different
> >>>>>>>>>>>>>>>>>>>> implementations,
> >>>>>>>>>>>>>>>>>>>>> where each of
> >>>>>>>>>>>>>>>>>>>>> them might evolve separately at a different pace.
> >> Hence
> >>>>>>>> when we
> >>>>>>>>>>>>>>>>>>>>> store
> >>>>>>>>>>>>>>>>>>>> them
> >>>>>>>>>>>>>>>>>>>>> in the json
> >>>>>>>>>>>>>>>>>>>>> plan we should perhaps use a logically defined unique
> >> id
> >>>>>>>> like
> >>>>>>>>>>>>>>>>>>>>> /bigIntToTimestamp/, /
> >>>>>>>>>>>>>>>>>>>>> stringToTimestamp/ and /stringToTimestampWithFormat/.
> >> This
> >>>>>>>> also
> >>>>>>>>>>>>>>>>>>>>> solves
> >>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>> issue of
> >>>>>>>>>>>>>>>>>>>>> correctly referencing the functions when restoring
> the
> >>>>> plan,
> >>>>>>>>>>>>>>>>>>>>> without
> >>>>>>>>>>>>>>>>>>>>> running again the
> >>>>>>>>>>>>>>>>>>>>> inference logic (which might have been changed in the
> >>>>>>>> meantime)
> >>>>>>>>>>>>>>>>>>>>> and it
> >>>>>>>>>>>>>>>>>>>>> might also solve
> >>>>>>>>>>>>>>>>>>>>> the versioning, that is the function identifier can
> >>>>> contain
> >>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>> function
> >>>>>>>>>>>>>>>>>>>>> version like /
> >>>>>>>>>>>>>>>>>>>>> stringToTimestampWithFormat_1_1 /or
> >>>>>>>>>>>>>>>>>>>>> /stringToTimestampWithFormat_1_2/.
> >>>>>>>>>>>>>>>>>>>> An
> >>>>>>>>>>>>>>>>>>>>> alternative could be to use the string signature
> >>>>>>>> representation,
> >>>>>>>>>>>>>>>>>>>>> which
> >>>>>>>>>>>>>>>>>>>>> might not be trivial
> >>>>>>>>>>>>>>>>>>>>> to compute, given the complexity of our type
> inference
> >>>>>>>> logic.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> *The term "JSON plan"*: I think we should rather keep
> >> JSON
> >>>>>>>> out
> >>>>>>>>>>>>>>>>>>>>> of the
> >>>>>>>>>>>>>>>>>>>>> concept and just
> >>>>>>>>>>>>>>>>>>>>> name it "Compiled Plan" (like the proposed API) or
> >>>>> something
> >>>>>>>>>>>>>>>>>>>>> similar,
> >>>>>>>>>>>>>>>>>>>> as I
> >>>>>>>>>>>>>>>>>>>>> see how in
> >>>>>>>>>>>>>>>>>>>>> future we might decide to support/modify our
> >> persistence
> >>>>>>>>>>>>>>>>>>>>> format to
> >>>>>>>>>>>>>>>>>>>>> something more
> >>>>>>>>>>>>>>>>>>>>> efficient storage wise like BSON. For example, I
> would
> >>>>>>>> rename /
> >>>>>>>>>>>>>>>>>>>>> CompiledPlan.fromJsonFile/ to simply
> >>>>>>>> /CompiledPlan.fromFile/.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> *Who is the owner of the plan file?* I asked myself
> >> this
> >>>>>>>>>>>>>>>>>>>>> question when
> >>>>>>>>>>>>>>>>>>>>> reading this:
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> For simplification of the design, we assume that
> >> upgrades
> >>>>>>>> use a
> >>>>>>>>>>>>>>>>>>>>>> step
> >>>>>>>>>>>>>>>>>>>>> size of a single
> >>>>>>>>>>>>>>>>>>>>> minor version. We don't guarantee skipping minor
> >> versions
> >>>>>>>> (e.g.
> >>>>>>>>>>>>>>>>>>>>> 1.11 to
> >>>>>>>>>>>>>>>>>>>>> 1.14).
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> My understanding of this statement is that a user can
> >>>>>>>> upgrade
> >>>>>>>>>>>>>>>>>>>>> between
> >>>>>>>>>>>>>>>>>>>>> minors but then
> >>>>>>>>>>>>>>>>>>>>> following all the minors, the same query can remain
> >> up and
> >>>>>>>>>>>>> running.
> >>>>>>>>>>>>>>>>>>>> E.g. I
> >>>>>>>>>>>>>>>>>>>>> upgrade from
> >>>>>>>>>>>>>>>>>>>>> 1.15 to 1.16, and then from 1.16 to 1.17 and I still
> >>>>> expect
> >>>>>>>> my
> >>>>>>>>>>>>>>>>>>>>> original
> >>>>>>>>>>>>>>>>>>>>> query to work
> >>>>>>>>>>>>>>>>>>>>> without recomputing the plan. This necessarily means
> >> that
> >>>>> at
> >>>>>>>>>>>>>>>>>>>>> some point
> >>>>>>>>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>>>>>> future
> >>>>>>>>>>>>>>>>>>>>> releases we'll need some basic "migration" tool to
> >> keep
> >>>>> the
> >>>>>>>>>>>>>>>>>>>>> queries up
> >>>>>>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>>> running,
> >>>>>>>>>>>>>>>>>>>>> ending up modifying the compiled plan. So I guess
> >> flink
> >>>>>>>> should
> >>>>>>>>>>>>>>>>>>>>> write it
> >>>>>>>>>>>>>>>>>>>>> back in the original
> >>>>>>>>>>>>>>>>>>>>> plan file, perhaps doing a backup of the previous
> >> one? Can
> >>>>>>>> you
> >>>>>>>>>>>>>>>>>>>>> please
> >>>>>>>>>>>>>>>>>>>>> clarify this aspect?
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Except these considerations, the proposal looks good
> >> to me
> >>>>>>>>>>>>>>>>>>>>> and I'm
> >>>>>>>>>>>>>>>>>>>> eagerly
> >>>>>>>>>>>>>>>>>>>>> waiting to see
> >>>>>>>>>>>>>>>>>>>>> it in play.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>>>>>>>>> FG
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> --
> >>>>>>>>>>>>>>>>>>>>> Francesco Guardiani | Software Engineer
> >>>>>>>>>>>>>>>>>>>>> france...@ververica.com[1]
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Follow us @VervericaData
> >>>>>>>>>>>>>>>>>>>>> --
> >>>>>>>>>>>>>>>>>>>>> Join Flink Forward[2] - The Apache Flink Conference
> >>>>>>>>>>>>>>>>>>>>> Stream Processing | Event Driven | Real Time
> >>>>>>>>>>>>>>>>>>>>> --
> >>>>>>>>>>>>>>>>>>>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin,
> >>>>> Germany
> >>>>>>>>>>>>>>>>>>>>> --
> >>>>>>>>>>>>>>>>>>>>> Ververica GmbH
> >>>>>>>>>>>>>>>>>>>>> Registered at Amtsgericht Charlottenburg: HRB 158244
> B
> >>>>>>>>>>>>>>>>>>>>> Managing Directors: Karl Anton Wehner, Holger Temme,
> >> Yip
> >>>>>>>> Park
> >>>>>>>>>>>>>>>>>>>>> Tung
> >>>>>>>>>>>>>>>>>>>> Jason,
> >>>>>>>>>>>>>>>>>>>>> Jinwei (Kevin)
> >>>>>>>>>>>>>>>>>>>>> Zhang
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> --------
> >>>>>>>>>>>>>>>>>>>>> [1] mailto:france...@ververica.com
> >>>>>>>>>>>>>>>>>>>>> [2] https://flink-forward.org/
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> --
> >>>>>>>>>>>>>>>>>>> Marios
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>
> >>>>>
> >>>>>
> >>>>
> >>>
> >>
> >>
> >
>
>

Reply via email to