Then for migrating from Flink 1.10 to 1.12, I might have to create a new
plan using Flink 1.11 in order to migrate from Flink 1.11 to 1.12, right?

Cheers,
Till

On Thu, Dec 2, 2021 at 3:39 PM Timo Walther <twal...@apache.org> wrote:

> Response to Till's feedback:
>
>  > compiled plan won't be changed after being written initially
>
> This is not entirely correct. We give guarantees for keeping the query
> up and running. We reserve us the right to force plan migrations. In
> this case, the plan might not be created from the SQL statement but from
> the old plan. I have added an example in section 10.1.1. In general,
> both persisted entities "plan" and "savepoint" can evolve independently
> from each other.
>
> Thanks,
> Timo
>
> On 02.12.21 15:10, Timo Walther wrote:
> > Response to Godfrey's feedback:
> >
> >  > "EXPLAIN PLAN EXECUTE STATEMENT SET BEGIN ... END" is missing.
> >
> > Thanks for the hint. I added a dedicated section 7.1.3.
> >
> >
> >  > it's hard to maintain the supported versions for
> > "supportedPlanChanges" and "supportedSavepointChanges"
> >
> > Actually, I think we are mostly on the same page.
> >
> > The annotation does not need to be updated for every Flink version. As
> > the name suggests it is about "Changes" (in other words:
> > incompatibilities) that require some kind of migration. Either plan
> > migration (= PlanChanges) or savepoint migration (=SavepointChanges,
> > using operator migration or savepoint migration).
> >
> > Let's assume we introduced two ExecNodes A and B in Flink 1.15.
> >
> > The annotations are:
> >
> > @ExecNodeMetadata(name=A, supportedPlanChanges=1.15,
> > supportedSavepointChanges=1.15)
> >
> > @ExecNodeMetadata(name=B, supportedPlanChanges=1.15,
> > supportedSavepointChanges=1.15)
> >
> > We change an operator state of B in Flink 1.16.
> >
> > We perform the change in the operator of B in a way to support both
> > state layouts. Thus, no need for a new ExecNode version.
> >
> > The annotations in 1.16 are:
> >
> > @ExecNodeMetadata(name=A, supportedPlanChanges=1.15,
> > supportedSavepointChanges=1.15)
> >
> > @ExecNodeMetadata(name=B, supportedPlanChanges=1.15,
> > supportedSavepointChanges=1.15, 1.16)
> >
> > So the versions in the annotations are "start version"s.
> >
> > I don't think we need end versions? End version would mean that we drop
> > the ExecNode from the code base?
> >
> > Please check the section 10.1.1 again. I added a more complex example.
> >
> >
> > Thanks,
> > Timo
> >
> >
> >
> > On 01.12.21 16:29, Timo Walther wrote:
> >> Response to Francesco's feedback:
> >>
> >>  > *Proposed changes #6*: Other than defining this rule of thumb, we
> >> must also make sure that compiling plans with these objects that
> >> cannot be serialized in the plan must fail hard
> >>
> >> Yes, I totally agree. We will fail hard with a helpful exception. Any
> >> mistake e.g. using a inline object in Table API or an invalid
> >> DataStream API source without uid should immediately fail a plan
> >> compilation step. I added a remark to the FLIP again.
> >>
> >>  > What worries me is breaking changes, in particular behavioural
> >> changes that might happen in connectors/formats
> >>
> >> Breaking changes in connectors and formats need to be encoded in the
> >> options. I could also imagine to versioning in the factory identifier
> >> `connector=kafka` and `connector=kafka-2`. If this is necessary.
> >>
> >> After thinking about your question again, I think we will also need
> >> the same testing infrastructure for our connectors and formats. Esp.
> >> restore tests and completeness test. I updated the document
> >> accordingly. Also I added a way to generate UIDs for DataStream API
> >> providers.
> >>
> >>  > *Functions:* Are we talking about the function name or the function
> >> complete signature?
> >>
> >> For catalog functions, the identifier contains catalog name and
> >> database name. For system functions, identifier contains only a name
> >> which make function name and identifier identical. I reworked the
> >> section again and also fixed some of the naming conflicts you mentioned.
> >>
> >>  > we should perhaps use a logically defined unique id like
> >> /bigIntToTimestamp/
> >>
> >> I added a concrete example for the resolution and restoration. The
> >> unique id is composed of name + version. Internally, this is
> >> represented as `$TO_TIMESTAMP_LTZ$1`.
> >>
> >>  > I think we should rather keep JSON out of the concept
> >>
> >> Sounds ok to me. In SQL we also just call it "plan". I will change the
> >> file sections. But would suggest to keep the fromJsonString method.
> >>
> >>  > write it back in the original plan file
> >>
> >> I updated the terminology section for what we consider an "upgrade".
> >> We might need to update the orginal plan file. This is already
> >> considered in the COMPILE PLAN ... FROM ... even though this is future
> >> work. Also savepoint migration.
> >>
> >> Thanks for all the feedback!
> >>
> >> Timo
> >>
> >>
> >> On 30.11.21 14:28, Timo Walther wrote:
> >>> Response to Wenlongs's feedback:
> >>>
> >>>  > I would prefer not to provide such a shortcut, let users use
> >>> COMPILE PLAN IF NOT EXISTS and EXECUTE explicitly, which can be
> >>> understood by new users even without inferring the docs.
> >>>
> >>> I would like to hear more opinions on this topic. Personally, I find
> >>> a combined statement very useful. Not only for quicker development
> >>> and debugging but also for readability. It helps in keeping the JSON
> >>> path and the query close to each other in order to know the origin of
> >>> the plan.
> >>>
> >>>  > but the plan and SQL are not matched. The result would be quite
> >>> confusing if we still execute the plan directly, we may need to add a
> >>> validation.
> >>>
> >>> You are right that there could be a mismatch. But we have a similar
> >>> problem when executing CREATE TABLE IF NOT EXISTS. The schema or
> >>> options of a table could have changed completely in the catalog but
> >>> the CREATE TABLE IF NOT EXISTS is not executed again. So a mismatch
> >>> could also occur there.
> >>>
> >>> Regards,
> >>> Timo
> >>>
> >>> On 30.11.21 14:17, Timo Walther wrote:
> >>>> Hi everyone,
> >>>>
> >>>> thanks for the feedback so far. Let me answer each email indvidually.
> >>>>
> >>>> I will start with a response to Ingo's feedback:
> >>>>
> >>>>  > Will the JSON plan's schema be considered an API?
> >>>>
> >>>> No, not in the first version. This is explicitly mentioned in the
> >>>> `General JSON Plan Assumptions`. I tried to improve the section once
> >>>> more to make it clearer. However, the JSON plan is definitely stable
> >>>> per minor version. And since the plan is versioned by Flink version,
> >>>> external tooling could be build around it. We might make it public
> >>>> API once the design has settled.
> >>>>
> >>>>  > Given that upgrades across multiple versions at once are
> >>>> unsupported, do we verify this somehow?
> >>>>
> >>>> Good question. I extended the `General JSON Plan Assumptions`. Now
> >>>> yes: the Flink version is part of the JSON plan and will be verified
> >>>> during restore. But keep in mind that we might support more that
> >>>> just the last version at least until the JSON plan has been migrated.
> >>>>
> >>>> Regards,
> >>>> Timo
> >>>>
> >>>> On 30.11.21 09:39, Marios Trivyzas wrote:
> >>>>> I have a question regarding the `COMPILE PLAN OVEWRITE`. If we
> >>>>> choose to go
> >>>>> with the config option instead,
> >>>>> that doesn't provide the flexibility to overwrite certain plans but
> >>>>> not
> >>>>> others, since the config applies globally, isn't that
> >>>>> something to consider?
> >>>>>
> >>>>> On Mon, Nov 29, 2021 at 10:15 AM Marios Trivyzas <mat...@gmail.com>
> >>>>> wrote:
> >>>>>
> >>>>>> Hi Timo!
> >>>>>>
> >>>>>> Thanks a lot for taking all that time and effort to put together
> this
> >>>>>> proposal!
> >>>>>>
> >>>>>> Regarding:
> >>>>>>> For simplification of the design, we assume that upgrades use a
> >>>>>>> step size
> >>>>>> of a single
> >>>>>> minor version. We don't guarantee skipping minor versions (e.g.
> >>>>>> 1.11 to
> >>>>>> 1.14).
> >>>>>>
> >>>>>> I think that for this first step we should make it absolutely
> >>>>>> clear to the
> >>>>>> users that they would need to go through all intermediate versions
> >>>>>> to end up with the target version they wish. If we are to support
> >>>>>> skipping
> >>>>>> versions in the future, i.e. upgrade from 1.14 to 1.17, this means
> >>>>>> that we need to have a testing infrastructure in place that would
> >>>>>> test all
> >>>>>> possible combinations of version upgrades, i.e. from 1.14 to 1.15,
> >>>>>> from 1.14 to 1.16 and so forth, while still testing and of course
> >>>>>> supporting all the upgrades from the previous minor version.
> >>>>>>
> >>>>>> I like a lot the idea of introducing HINTS to define some
> >>>>>> behaviour in the
> >>>>>> programs!
> >>>>>> - the hints live together with the sql statements and consequently
> >>>>>> the
> >>>>>> (JSON) plans.
> >>>>>> - If multiple queries are involved in a program, each one of them
> can
> >>>>>> define its own config (regarding plan optimisation, not null
> >>>>>> enforcement,
> >>>>>> etc)
> >>>>>>
> >>>>>> I agree with Francesco on his argument regarding the *JSON* plan. I
> >>>>>> believe we should already provide flexibility here, since (who
> >>>>>> knows) in
> >>>>>> the future
> >>>>>> a JSON plan might not fulfil the desired functionality.
> >>>>>>
> >>>>>> I also agree that we need some very obvious way (i.e. not log
> >>>>>> entry) to
> >>>>>> show the users that their program doesn't support version
> >>>>>> upgrades, and
> >>>>>> prevent them from being negatively surprised in the future, when
> >>>>>> trying to
> >>>>>> upgrade their production pipelines.
> >>>>>>
> >>>>>> This is an implementation detail, but I'd like to add that there
> >>>>>> should be
> >>>>>> some good logging in place when the upgrade is taking place, to be
> >>>>>> able to track every restoration action, and help debug any potential
> >>>>>> issues arising from that.
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> On Fri, Nov 26, 2021 at 2:54 PM Till Rohrmann <trohrm...@apache.org
> >
> >>>>>> wrote:
> >>>>>>
> >>>>>>> Thanks for writing this FLIP Timo. I think this will be a very
> >>>>>>> important
> >>>>>>> improvement for Flink and our SQL user :-)
> >>>>>>>
> >>>>>>> Similar to Francesco I would like to understand the statement
> >>>>>>>
> >>>>>>>> For simplification of the design, we assume that upgrades use a
> >>>>>>>> step
> >>>>>>> size
> >>>>>>> of a single
> >>>>>>> minor version. We don't guarantee skipping minor versions (e.g.
> >>>>>>> 1.11 to
> >>>>>>> 1.14).
> >>>>>>>
> >>>>>>> a bit better. Is it because Flink does not guarantee that a
> >>>>>>> savepoint
> >>>>>>> created by version 1.x can be directly recovered by version 1.y
> >>>>>>> with x + 1
> >>>>>>> < y but users might have to go through a cascade of upgrades?
> >>>>>>> From how I
> >>>>>>> understand your proposal, the compiled plan won't be changed
> >>>>>>> after being
> >>>>>>> written initially. Hence, I would assume that for the plan alone
> >>>>>>> Flink
> >>>>>>> will
> >>>>>>> have to give backwards compatibility guarantees for all versions.
> >>>>>>> Am I
> >>>>>>> understanding this part correctly?
> >>>>>>>
> >>>>>>> Cheers,
> >>>>>>> Till
> >>>>>>>
> >>>>>>> On Thu, Nov 25, 2021 at 4:55 PM Francesco Guardiani <
> >>>>>>> france...@ververica.com>
> >>>>>>> wrote:
> >>>>>>>
> >>>>>>>> Hi Timo,
> >>>>>>>>
> >>>>>>>> Thanks for putting this amazing work together, I have some
> >>>>>>>> considerations/questions
> >>>>>>>> about the FLIP:
> >>>>>>>> *Proposed changes #6*: Other than defining this rule of thumb,
> >>>>>>>> we must
> >>>>>>>> also make sure
> >>>>>>>> that compiling plans with these objects that cannot be
> >>>>>>>> serialized in the
> >>>>>>>> plan must fail hard,
> >>>>>>>> so users don't bite themselves with such issues, or at least we
> >>>>>>>> need to
> >>>>>>>> output warning
> >>>>>>>> logs. In general, whenever the user is trying to use the
> >>>>>>>> CompiledPlan
> >>>>>>> APIs
> >>>>>>>> and at the same
> >>>>>>>> time, they're trying to do something "illegal" for the plan, we
> >>>>>>>> should
> >>>>>>>> immediately either
> >>>>>>>> log or fail depending on the issue, in order to avoid any
> >>>>>>>> surprises once
> >>>>>>>> the user upgrades.
> >>>>>>>> I would also say the same for things like registering a function,
> >>>>>>>> registering a DataStream,
> >>>>>>>> and for every other thing which won't end up in the plan, we
> >>>>>>>> should log
> >>>>>>>> such info to the
> >>>>>>>> user by default.
> >>>>>>>>
> >>>>>>>> *General JSON Plan Assumptions #9:* When thinking to connectors
> and
> >>>>>>>> formats, I think
> >>>>>>>> it's reasonable to assume and keep out of the feature design
> >>>>>>>> that no
> >>>>>>>> feature/ability can
> >>>>>>>> deleted from a connector/format. I also don't think new
> >>>>>>> features/abilities
> >>>>>>>> can influence
> >>>>>>>> this FLIP as well, given the plan is static, so if for example,
> >>>>>>>> MyCoolTableSink in the next
> >>>>>>>> flink version implements SupportsProjectionsPushDown, then it
> >>>>>>>> shouldn't
> >>>>>>> be
> >>>>>>>> a problem
> >>>>>>>> for the upgrade story since the plan is still configured as
> >>>>>>>> computed
> >>>>>>> from
> >>>>>>>> the previous flink
> >>>>>>>> version. What worries me is breaking changes, in particular
> >>>>>>>> behavioural
> >>>>>>>> changes that
> >>>>>>>> might happen in connectors/formats. Although this argument
> >>>>>>>> doesn't seem
> >>>>>>>> relevant for
> >>>>>>>> the connectors shipped by the flink project itself, because we
> >>>>>>>> try to
> >>>>>>> keep
> >>>>>>>> them as stable as
> >>>>>>>> possible and avoid eventual breaking changes, it's compelling to
> >>>>>>> external
> >>>>>>>> connectors and
> >>>>>>>> formats, which might be decoupled from the flink release cycle
> >>>>>>>> and might
> >>>>>>>> have different
> >>>>>>>> backward compatibility guarantees. It's totally reasonable if we
> >>>>>>>> don't
> >>>>>>>> want to tackle it in
> >>>>>>>> this first iteration of the feature, but it's something we need
> >>>>>>>> to keep
> >>>>>>> in
> >>>>>>>> mind for the future.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> *Functions:* It's not clear to me what you mean for "identifier",
> >>>>>>> because
> >>>>>>>> then somewhere
> >>>>>>>> else in the same context you talk about "name". Are we talking
> >>>>>>>> about the
> >>>>>>>> function name
> >>>>>>>> or the function complete signature? Let's assume for example we
> >>>>>>>> have
> >>>>>>> these
> >>>>>>>> function
> >>>>>>>> definitions:
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> * TO_TIMESTAMP_LTZ(BIGINT)
> >>>>>>>> * TO_TIMESTAMP_LTZ(STRING)
> >>>>>>>> * TO_TIMESTAMP_LTZ(STRING, STRING)
> >>>>>>>>
> >>>>>>>> These for me are very different functions with different
> >>>>>>> implementations,
> >>>>>>>> where each of
> >>>>>>>> them might evolve separately at a different pace. Hence when we
> >>>>>>>> store
> >>>>>>> them
> >>>>>>>> in the json
> >>>>>>>> plan we should perhaps use a logically defined unique id like
> >>>>>>>> /bigIntToTimestamp/, /
> >>>>>>>> stringToTimestamp/ and /stringToTimestampWithFormat/. This also
> >>>>>>>> solves
> >>>>>>> the
> >>>>>>>> issue of
> >>>>>>>> correctly referencing the functions when restoring the plan,
> >>>>>>>> without
> >>>>>>>> running again the
> >>>>>>>> inference logic (which might have been changed in the meantime)
> >>>>>>>> and it
> >>>>>>>> might also solve
> >>>>>>>> the versioning, that is the function identifier can contain the
> >>>>>>>> function
> >>>>>>>> version like /
> >>>>>>>> stringToTimestampWithFormat_1_1 /or
> >>>>>>>> /stringToTimestampWithFormat_1_2/.
> >>>>>>> An
> >>>>>>>> alternative could be to use the string signature representation,
> >>>>>>>> which
> >>>>>>>> might not be trivial
> >>>>>>>> to compute, given the complexity of our type inference logic.
> >>>>>>>>
> >>>>>>>> *The term "JSON plan"*: I think we should rather keep JSON out
> >>>>>>>> of the
> >>>>>>>> concept and just
> >>>>>>>> name it "Compiled Plan" (like the proposed API) or something
> >>>>>>>> similar,
> >>>>>>> as I
> >>>>>>>> see how in
> >>>>>>>> future we might decide to support/modify our persistence format to
> >>>>>>>> something more
> >>>>>>>> efficient storage wise like BSON. For example, I would rename /
> >>>>>>>> CompiledPlan.fromJsonFile/ to simply /CompiledPlan.fromFile/.
> >>>>>>>>
> >>>>>>>> *Who is the owner of the plan file?* I asked myself this
> >>>>>>>> question when
> >>>>>>>> reading this:
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>> For simplification of the design, we assume that upgrades use a
> >>>>>>>>> step
> >>>>>>>> size of a single
> >>>>>>>> minor version. We don't guarantee skipping minor versions (e.g.
> >>>>>>>> 1.11 to
> >>>>>>>> 1.14).
> >>>>>>>>
> >>>>>>>> My understanding of this statement is that a user can upgrade
> >>>>>>>> between
> >>>>>>>> minors but then
> >>>>>>>> following all the minors, the same query can remain up and
> running.
> >>>>>>> E.g. I
> >>>>>>>> upgrade from
> >>>>>>>> 1.15 to 1.16, and then from 1.16 to 1.17 and I still expect my
> >>>>>>>> original
> >>>>>>>> query to work
> >>>>>>>> without recomputing the plan. This necessarily means that at
> >>>>>>>> some point
> >>>>>>> in
> >>>>>>>> future
> >>>>>>>> releases we'll need some basic "migration" tool to keep the
> >>>>>>>> queries up
> >>>>>>> and
> >>>>>>>> running,
> >>>>>>>> ending up modifying the compiled plan. So I guess flink should
> >>>>>>>> write it
> >>>>>>>> back in the original
> >>>>>>>> plan file, perhaps doing a backup of the previous one? Can you
> >>>>>>>> please
> >>>>>>>> clarify this aspect?
> >>>>>>>>
> >>>>>>>> Except these considerations, the proposal looks good to me and I'm
> >>>>>>> eagerly
> >>>>>>>> waiting to see
> >>>>>>>> it in play.
> >>>>>>>>
> >>>>>>>> Thanks,
> >>>>>>>> FG
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> --
> >>>>>>>> Francesco Guardiani | Software Engineer
> >>>>>>>> france...@ververica.com[1]
> >>>>>>>>
> >>>>>>>> Follow us @VervericaData
> >>>>>>>> --
> >>>>>>>> Join Flink Forward[2] - The Apache Flink Conference
> >>>>>>>> Stream Processing | Event Driven | Real Time
> >>>>>>>> --
> >>>>>>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
> >>>>>>>> --
> >>>>>>>> Ververica GmbH
> >>>>>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> >>>>>>>> Managing Directors: Karl Anton Wehner, Holger Temme, Yip Park Tung
> >>>>>>> Jason,
> >>>>>>>> Jinwei (Kevin)
> >>>>>>>> Zhang
> >>>>>>>>
> >>>>>>>> --------
> >>>>>>>> [1] mailto:france...@ververica.com
> >>>>>>>> [2] https://flink-forward.org/
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>>
> >>>>>> --
> >>>>>> Marios
> >>>>>>
> >>>>>
> >>>>>
> >>>>
> >>>
> >>
> >
>
>

Reply via email to