Then for migrating from Flink 1.10 to 1.12, I might have to create a new plan using Flink 1.11 in order to migrate from Flink 1.11 to 1.12, right?
Cheers, Till On Thu, Dec 2, 2021 at 3:39 PM Timo Walther <twal...@apache.org> wrote: > Response to Till's feedback: > > > compiled plan won't be changed after being written initially > > This is not entirely correct. We give guarantees for keeping the query > up and running. We reserve us the right to force plan migrations. In > this case, the plan might not be created from the SQL statement but from > the old plan. I have added an example in section 10.1.1. In general, > both persisted entities "plan" and "savepoint" can evolve independently > from each other. > > Thanks, > Timo > > On 02.12.21 15:10, Timo Walther wrote: > > Response to Godfrey's feedback: > > > > > "EXPLAIN PLAN EXECUTE STATEMENT SET BEGIN ... END" is missing. > > > > Thanks for the hint. I added a dedicated section 7.1.3. > > > > > > > it's hard to maintain the supported versions for > > "supportedPlanChanges" and "supportedSavepointChanges" > > > > Actually, I think we are mostly on the same page. > > > > The annotation does not need to be updated for every Flink version. As > > the name suggests it is about "Changes" (in other words: > > incompatibilities) that require some kind of migration. Either plan > > migration (= PlanChanges) or savepoint migration (=SavepointChanges, > > using operator migration or savepoint migration). > > > > Let's assume we introduced two ExecNodes A and B in Flink 1.15. > > > > The annotations are: > > > > @ExecNodeMetadata(name=A, supportedPlanChanges=1.15, > > supportedSavepointChanges=1.15) > > > > @ExecNodeMetadata(name=B, supportedPlanChanges=1.15, > > supportedSavepointChanges=1.15) > > > > We change an operator state of B in Flink 1.16. > > > > We perform the change in the operator of B in a way to support both > > state layouts. Thus, no need for a new ExecNode version. > > > > The annotations in 1.16 are: > > > > @ExecNodeMetadata(name=A, supportedPlanChanges=1.15, > > supportedSavepointChanges=1.15) > > > > @ExecNodeMetadata(name=B, supportedPlanChanges=1.15, > > supportedSavepointChanges=1.15, 1.16) > > > > So the versions in the annotations are "start version"s. > > > > I don't think we need end versions? End version would mean that we drop > > the ExecNode from the code base? > > > > Please check the section 10.1.1 again. I added a more complex example. > > > > > > Thanks, > > Timo > > > > > > > > On 01.12.21 16:29, Timo Walther wrote: > >> Response to Francesco's feedback: > >> > >> > *Proposed changes #6*: Other than defining this rule of thumb, we > >> must also make sure that compiling plans with these objects that > >> cannot be serialized in the plan must fail hard > >> > >> Yes, I totally agree. We will fail hard with a helpful exception. Any > >> mistake e.g. using a inline object in Table API or an invalid > >> DataStream API source without uid should immediately fail a plan > >> compilation step. I added a remark to the FLIP again. > >> > >> > What worries me is breaking changes, in particular behavioural > >> changes that might happen in connectors/formats > >> > >> Breaking changes in connectors and formats need to be encoded in the > >> options. I could also imagine to versioning in the factory identifier > >> `connector=kafka` and `connector=kafka-2`. If this is necessary. > >> > >> After thinking about your question again, I think we will also need > >> the same testing infrastructure for our connectors and formats. Esp. > >> restore tests and completeness test. I updated the document > >> accordingly. Also I added a way to generate UIDs for DataStream API > >> providers. > >> > >> > *Functions:* Are we talking about the function name or the function > >> complete signature? > >> > >> For catalog functions, the identifier contains catalog name and > >> database name. For system functions, identifier contains only a name > >> which make function name and identifier identical. I reworked the > >> section again and also fixed some of the naming conflicts you mentioned. > >> > >> > we should perhaps use a logically defined unique id like > >> /bigIntToTimestamp/ > >> > >> I added a concrete example for the resolution and restoration. The > >> unique id is composed of name + version. Internally, this is > >> represented as `$TO_TIMESTAMP_LTZ$1`. > >> > >> > I think we should rather keep JSON out of the concept > >> > >> Sounds ok to me. In SQL we also just call it "plan". I will change the > >> file sections. But would suggest to keep the fromJsonString method. > >> > >> > write it back in the original plan file > >> > >> I updated the terminology section for what we consider an "upgrade". > >> We might need to update the orginal plan file. This is already > >> considered in the COMPILE PLAN ... FROM ... even though this is future > >> work. Also savepoint migration. > >> > >> Thanks for all the feedback! > >> > >> Timo > >> > >> > >> On 30.11.21 14:28, Timo Walther wrote: > >>> Response to Wenlongs's feedback: > >>> > >>> > I would prefer not to provide such a shortcut, let users use > >>> COMPILE PLAN IF NOT EXISTS and EXECUTE explicitly, which can be > >>> understood by new users even without inferring the docs. > >>> > >>> I would like to hear more opinions on this topic. Personally, I find > >>> a combined statement very useful. Not only for quicker development > >>> and debugging but also for readability. It helps in keeping the JSON > >>> path and the query close to each other in order to know the origin of > >>> the plan. > >>> > >>> > but the plan and SQL are not matched. The result would be quite > >>> confusing if we still execute the plan directly, we may need to add a > >>> validation. > >>> > >>> You are right that there could be a mismatch. But we have a similar > >>> problem when executing CREATE TABLE IF NOT EXISTS. The schema or > >>> options of a table could have changed completely in the catalog but > >>> the CREATE TABLE IF NOT EXISTS is not executed again. So a mismatch > >>> could also occur there. > >>> > >>> Regards, > >>> Timo > >>> > >>> On 30.11.21 14:17, Timo Walther wrote: > >>>> Hi everyone, > >>>> > >>>> thanks for the feedback so far. Let me answer each email indvidually. > >>>> > >>>> I will start with a response to Ingo's feedback: > >>>> > >>>> > Will the JSON plan's schema be considered an API? > >>>> > >>>> No, not in the first version. This is explicitly mentioned in the > >>>> `General JSON Plan Assumptions`. I tried to improve the section once > >>>> more to make it clearer. However, the JSON plan is definitely stable > >>>> per minor version. And since the plan is versioned by Flink version, > >>>> external tooling could be build around it. We might make it public > >>>> API once the design has settled. > >>>> > >>>> > Given that upgrades across multiple versions at once are > >>>> unsupported, do we verify this somehow? > >>>> > >>>> Good question. I extended the `General JSON Plan Assumptions`. Now > >>>> yes: the Flink version is part of the JSON plan and will be verified > >>>> during restore. But keep in mind that we might support more that > >>>> just the last version at least until the JSON plan has been migrated. > >>>> > >>>> Regards, > >>>> Timo > >>>> > >>>> On 30.11.21 09:39, Marios Trivyzas wrote: > >>>>> I have a question regarding the `COMPILE PLAN OVEWRITE`. If we > >>>>> choose to go > >>>>> with the config option instead, > >>>>> that doesn't provide the flexibility to overwrite certain plans but > >>>>> not > >>>>> others, since the config applies globally, isn't that > >>>>> something to consider? > >>>>> > >>>>> On Mon, Nov 29, 2021 at 10:15 AM Marios Trivyzas <mat...@gmail.com> > >>>>> wrote: > >>>>> > >>>>>> Hi Timo! > >>>>>> > >>>>>> Thanks a lot for taking all that time and effort to put together > this > >>>>>> proposal! > >>>>>> > >>>>>> Regarding: > >>>>>>> For simplification of the design, we assume that upgrades use a > >>>>>>> step size > >>>>>> of a single > >>>>>> minor version. We don't guarantee skipping minor versions (e.g. > >>>>>> 1.11 to > >>>>>> 1.14). > >>>>>> > >>>>>> I think that for this first step we should make it absolutely > >>>>>> clear to the > >>>>>> users that they would need to go through all intermediate versions > >>>>>> to end up with the target version they wish. If we are to support > >>>>>> skipping > >>>>>> versions in the future, i.e. upgrade from 1.14 to 1.17, this means > >>>>>> that we need to have a testing infrastructure in place that would > >>>>>> test all > >>>>>> possible combinations of version upgrades, i.e. from 1.14 to 1.15, > >>>>>> from 1.14 to 1.16 and so forth, while still testing and of course > >>>>>> supporting all the upgrades from the previous minor version. > >>>>>> > >>>>>> I like a lot the idea of introducing HINTS to define some > >>>>>> behaviour in the > >>>>>> programs! > >>>>>> - the hints live together with the sql statements and consequently > >>>>>> the > >>>>>> (JSON) plans. > >>>>>> - If multiple queries are involved in a program, each one of them > can > >>>>>> define its own config (regarding plan optimisation, not null > >>>>>> enforcement, > >>>>>> etc) > >>>>>> > >>>>>> I agree with Francesco on his argument regarding the *JSON* plan. I > >>>>>> believe we should already provide flexibility here, since (who > >>>>>> knows) in > >>>>>> the future > >>>>>> a JSON plan might not fulfil the desired functionality. > >>>>>> > >>>>>> I also agree that we need some very obvious way (i.e. not log > >>>>>> entry) to > >>>>>> show the users that their program doesn't support version > >>>>>> upgrades, and > >>>>>> prevent them from being negatively surprised in the future, when > >>>>>> trying to > >>>>>> upgrade their production pipelines. > >>>>>> > >>>>>> This is an implementation detail, but I'd like to add that there > >>>>>> should be > >>>>>> some good logging in place when the upgrade is taking place, to be > >>>>>> able to track every restoration action, and help debug any potential > >>>>>> issues arising from that. > >>>>>> > >>>>>> > >>>>>> > >>>>>> > >>>>>> > >>>>>> On Fri, Nov 26, 2021 at 2:54 PM Till Rohrmann <trohrm...@apache.org > > > >>>>>> wrote: > >>>>>> > >>>>>>> Thanks for writing this FLIP Timo. I think this will be a very > >>>>>>> important > >>>>>>> improvement for Flink and our SQL user :-) > >>>>>>> > >>>>>>> Similar to Francesco I would like to understand the statement > >>>>>>> > >>>>>>>> For simplification of the design, we assume that upgrades use a > >>>>>>>> step > >>>>>>> size > >>>>>>> of a single > >>>>>>> minor version. We don't guarantee skipping minor versions (e.g. > >>>>>>> 1.11 to > >>>>>>> 1.14). > >>>>>>> > >>>>>>> a bit better. Is it because Flink does not guarantee that a > >>>>>>> savepoint > >>>>>>> created by version 1.x can be directly recovered by version 1.y > >>>>>>> with x + 1 > >>>>>>> < y but users might have to go through a cascade of upgrades? > >>>>>>> From how I > >>>>>>> understand your proposal, the compiled plan won't be changed > >>>>>>> after being > >>>>>>> written initially. Hence, I would assume that for the plan alone > >>>>>>> Flink > >>>>>>> will > >>>>>>> have to give backwards compatibility guarantees for all versions. > >>>>>>> Am I > >>>>>>> understanding this part correctly? > >>>>>>> > >>>>>>> Cheers, > >>>>>>> Till > >>>>>>> > >>>>>>> On Thu, Nov 25, 2021 at 4:55 PM Francesco Guardiani < > >>>>>>> france...@ververica.com> > >>>>>>> wrote: > >>>>>>> > >>>>>>>> Hi Timo, > >>>>>>>> > >>>>>>>> Thanks for putting this amazing work together, I have some > >>>>>>>> considerations/questions > >>>>>>>> about the FLIP: > >>>>>>>> *Proposed changes #6*: Other than defining this rule of thumb, > >>>>>>>> we must > >>>>>>>> also make sure > >>>>>>>> that compiling plans with these objects that cannot be > >>>>>>>> serialized in the > >>>>>>>> plan must fail hard, > >>>>>>>> so users don't bite themselves with such issues, or at least we > >>>>>>>> need to > >>>>>>>> output warning > >>>>>>>> logs. In general, whenever the user is trying to use the > >>>>>>>> CompiledPlan > >>>>>>> APIs > >>>>>>>> and at the same > >>>>>>>> time, they're trying to do something "illegal" for the plan, we > >>>>>>>> should > >>>>>>>> immediately either > >>>>>>>> log or fail depending on the issue, in order to avoid any > >>>>>>>> surprises once > >>>>>>>> the user upgrades. > >>>>>>>> I would also say the same for things like registering a function, > >>>>>>>> registering a DataStream, > >>>>>>>> and for every other thing which won't end up in the plan, we > >>>>>>>> should log > >>>>>>>> such info to the > >>>>>>>> user by default. > >>>>>>>> > >>>>>>>> *General JSON Plan Assumptions #9:* When thinking to connectors > and > >>>>>>>> formats, I think > >>>>>>>> it's reasonable to assume and keep out of the feature design > >>>>>>>> that no > >>>>>>>> feature/ability can > >>>>>>>> deleted from a connector/format. I also don't think new > >>>>>>> features/abilities > >>>>>>>> can influence > >>>>>>>> this FLIP as well, given the plan is static, so if for example, > >>>>>>>> MyCoolTableSink in the next > >>>>>>>> flink version implements SupportsProjectionsPushDown, then it > >>>>>>>> shouldn't > >>>>>>> be > >>>>>>>> a problem > >>>>>>>> for the upgrade story since the plan is still configured as > >>>>>>>> computed > >>>>>>> from > >>>>>>>> the previous flink > >>>>>>>> version. What worries me is breaking changes, in particular > >>>>>>>> behavioural > >>>>>>>> changes that > >>>>>>>> might happen in connectors/formats. Although this argument > >>>>>>>> doesn't seem > >>>>>>>> relevant for > >>>>>>>> the connectors shipped by the flink project itself, because we > >>>>>>>> try to > >>>>>>> keep > >>>>>>>> them as stable as > >>>>>>>> possible and avoid eventual breaking changes, it's compelling to > >>>>>>> external > >>>>>>>> connectors and > >>>>>>>> formats, which might be decoupled from the flink release cycle > >>>>>>>> and might > >>>>>>>> have different > >>>>>>>> backward compatibility guarantees. It's totally reasonable if we > >>>>>>>> don't > >>>>>>>> want to tackle it in > >>>>>>>> this first iteration of the feature, but it's something we need > >>>>>>>> to keep > >>>>>>> in > >>>>>>>> mind for the future. > >>>>>>>> > >>>>>>>> > >>>>>>>> *Functions:* It's not clear to me what you mean for "identifier", > >>>>>>> because > >>>>>>>> then somewhere > >>>>>>>> else in the same context you talk about "name". Are we talking > >>>>>>>> about the > >>>>>>>> function name > >>>>>>>> or the function complete signature? Let's assume for example we > >>>>>>>> have > >>>>>>> these > >>>>>>>> function > >>>>>>>> definitions: > >>>>>>>> > >>>>>>>> > >>>>>>>> * TO_TIMESTAMP_LTZ(BIGINT) > >>>>>>>> * TO_TIMESTAMP_LTZ(STRING) > >>>>>>>> * TO_TIMESTAMP_LTZ(STRING, STRING) > >>>>>>>> > >>>>>>>> These for me are very different functions with different > >>>>>>> implementations, > >>>>>>>> where each of > >>>>>>>> them might evolve separately at a different pace. Hence when we > >>>>>>>> store > >>>>>>> them > >>>>>>>> in the json > >>>>>>>> plan we should perhaps use a logically defined unique id like > >>>>>>>> /bigIntToTimestamp/, / > >>>>>>>> stringToTimestamp/ and /stringToTimestampWithFormat/. This also > >>>>>>>> solves > >>>>>>> the > >>>>>>>> issue of > >>>>>>>> correctly referencing the functions when restoring the plan, > >>>>>>>> without > >>>>>>>> running again the > >>>>>>>> inference logic (which might have been changed in the meantime) > >>>>>>>> and it > >>>>>>>> might also solve > >>>>>>>> the versioning, that is the function identifier can contain the > >>>>>>>> function > >>>>>>>> version like / > >>>>>>>> stringToTimestampWithFormat_1_1 /or > >>>>>>>> /stringToTimestampWithFormat_1_2/. > >>>>>>> An > >>>>>>>> alternative could be to use the string signature representation, > >>>>>>>> which > >>>>>>>> might not be trivial > >>>>>>>> to compute, given the complexity of our type inference logic. > >>>>>>>> > >>>>>>>> *The term "JSON plan"*: I think we should rather keep JSON out > >>>>>>>> of the > >>>>>>>> concept and just > >>>>>>>> name it "Compiled Plan" (like the proposed API) or something > >>>>>>>> similar, > >>>>>>> as I > >>>>>>>> see how in > >>>>>>>> future we might decide to support/modify our persistence format to > >>>>>>>> something more > >>>>>>>> efficient storage wise like BSON. For example, I would rename / > >>>>>>>> CompiledPlan.fromJsonFile/ to simply /CompiledPlan.fromFile/. > >>>>>>>> > >>>>>>>> *Who is the owner of the plan file?* I asked myself this > >>>>>>>> question when > >>>>>>>> reading this: > >>>>>>>> > >>>>>>>> > >>>>>>>>> For simplification of the design, we assume that upgrades use a > >>>>>>>>> step > >>>>>>>> size of a single > >>>>>>>> minor version. We don't guarantee skipping minor versions (e.g. > >>>>>>>> 1.11 to > >>>>>>>> 1.14). > >>>>>>>> > >>>>>>>> My understanding of this statement is that a user can upgrade > >>>>>>>> between > >>>>>>>> minors but then > >>>>>>>> following all the minors, the same query can remain up and > running. > >>>>>>> E.g. I > >>>>>>>> upgrade from > >>>>>>>> 1.15 to 1.16, and then from 1.16 to 1.17 and I still expect my > >>>>>>>> original > >>>>>>>> query to work > >>>>>>>> without recomputing the plan. This necessarily means that at > >>>>>>>> some point > >>>>>>> in > >>>>>>>> future > >>>>>>>> releases we'll need some basic "migration" tool to keep the > >>>>>>>> queries up > >>>>>>> and > >>>>>>>> running, > >>>>>>>> ending up modifying the compiled plan. So I guess flink should > >>>>>>>> write it > >>>>>>>> back in the original > >>>>>>>> plan file, perhaps doing a backup of the previous one? Can you > >>>>>>>> please > >>>>>>>> clarify this aspect? > >>>>>>>> > >>>>>>>> Except these considerations, the proposal looks good to me and I'm > >>>>>>> eagerly > >>>>>>>> waiting to see > >>>>>>>> it in play. > >>>>>>>> > >>>>>>>> Thanks, > >>>>>>>> FG > >>>>>>>> > >>>>>>>> > >>>>>>>> -- > >>>>>>>> Francesco Guardiani | Software Engineer > >>>>>>>> france...@ververica.com[1] > >>>>>>>> > >>>>>>>> Follow us @VervericaData > >>>>>>>> -- > >>>>>>>> Join Flink Forward[2] - The Apache Flink Conference > >>>>>>>> Stream Processing | Event Driven | Real Time > >>>>>>>> -- > >>>>>>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany > >>>>>>>> -- > >>>>>>>> Ververica GmbH > >>>>>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B > >>>>>>>> Managing Directors: Karl Anton Wehner, Holger Temme, Yip Park Tung > >>>>>>> Jason, > >>>>>>>> Jinwei (Kevin) > >>>>>>>> Zhang > >>>>>>>> > >>>>>>>> -------- > >>>>>>>> [1] mailto:france...@ververica.com > >>>>>>>> [2] https://flink-forward.org/ > >>>>>>>> > >>>>>>> > >>>>>> > >>>>>> > >>>>>> -- > >>>>>> Marios > >>>>>> > >>>>> > >>>>> > >>>> > >>> > >> > > > >